From 72231250ed81e10d66bfe70701e64fa5fe50f712 Mon Sep 17 00:00:00 2001 From: Jon Bratseth Date: Wed, 15 Jun 2016 23:09:44 +0200 Subject: Publish --- .../com/yahoo/search/federation/CommonFields.java | 22 + .../search/federation/FederationSearcher.java | 948 ++++++++++++++++++++ .../search/federation/ForwardingSearcher.java | 106 +++ .../com/yahoo/search/federation/FutureWaiter.java | 58 ++ .../yahoo/search/federation/TimeoutException.java | 20 + .../http/ConfiguredHTTPClientSearcher.java | 36 + .../http/ConfiguredHTTPProviderSearcher.java | 68 ++ .../federation/http/ConfiguredSearcherHelper.java | 27 + .../yahoo/search/federation/http/Connection.java | 30 + .../federation/http/GzipDecompressingEntity.java | 125 +++ .../search/federation/http/HTTPClientSearcher.java | 276 ++++++ .../search/federation/http/HTTPParameters.java | 315 +++++++ .../federation/http/HTTPProviderSearcher.java | 260 ++++++ .../yahoo/search/federation/http/HTTPSearcher.java | 958 +++++++++++++++++++++ .../search/federation/http/TimedHttpEntity.java | 88 ++ .../yahoo/search/federation/http/TimedStream.java | 111 +++ .../search/federation/http/TimeoutException.java | 20 + .../yahoo/search/federation/http/package-info.java | 7 + .../com/yahoo/search/federation/package-info.java | 17 + .../federation/selection/FederationTarget.java | 68 ++ .../federation/selection/TargetSelector.java | 35 + .../search/federation/selection/package-info.java | 7 + .../sourceref/SearchChainInvocationSpec.java | 37 + .../federation/sourceref/SearchChainResolver.java | 160 ++++ .../search/federation/sourceref/SingleTarget.java | 36 + .../federation/sourceref/SourceRefResolver.java | 71 ++ .../search/federation/sourceref/SourcesTarget.java | 112 +++ .../yahoo/search/federation/sourceref/Target.java | 31 + .../sourceref/UnresolvedProviderException.java | 22 + .../sourceref/UnresolvedSearchChainException.java | 13 + .../sourceref/UnresolvedSourceRefException.java | 21 + .../search/federation/vespa/QueryMarshaller.java | 170 ++++ .../search/federation/vespa/ResultBuilder.java | 642 ++++++++++++++ .../search/federation/vespa/VespaSearcher.java | 270 ++++++ .../search/federation/vespa/package-info.java | 7 + 35 files changed, 5194 insertions(+) create mode 100644 container-search/src/main/java/com/yahoo/search/federation/CommonFields.java create mode 100644 container-search/src/main/java/com/yahoo/search/federation/FederationSearcher.java create mode 100644 container-search/src/main/java/com/yahoo/search/federation/ForwardingSearcher.java create mode 100644 container-search/src/main/java/com/yahoo/search/federation/FutureWaiter.java create mode 100644 container-search/src/main/java/com/yahoo/search/federation/TimeoutException.java create mode 100644 container-search/src/main/java/com/yahoo/search/federation/http/ConfiguredHTTPClientSearcher.java create mode 100644 container-search/src/main/java/com/yahoo/search/federation/http/ConfiguredHTTPProviderSearcher.java create mode 100644 container-search/src/main/java/com/yahoo/search/federation/http/ConfiguredSearcherHelper.java create mode 100644 container-search/src/main/java/com/yahoo/search/federation/http/Connection.java create mode 100644 container-search/src/main/java/com/yahoo/search/federation/http/GzipDecompressingEntity.java create mode 100644 container-search/src/main/java/com/yahoo/search/federation/http/HTTPClientSearcher.java create mode 100644 container-search/src/main/java/com/yahoo/search/federation/http/HTTPParameters.java create mode 100644 container-search/src/main/java/com/yahoo/search/federation/http/HTTPProviderSearcher.java create mode 100644 container-search/src/main/java/com/yahoo/search/federation/http/HTTPSearcher.java create mode 100644 container-search/src/main/java/com/yahoo/search/federation/http/TimedHttpEntity.java create mode 100644 container-search/src/main/java/com/yahoo/search/federation/http/TimedStream.java create mode 100644 container-search/src/main/java/com/yahoo/search/federation/http/TimeoutException.java create mode 100644 container-search/src/main/java/com/yahoo/search/federation/http/package-info.java create mode 100644 container-search/src/main/java/com/yahoo/search/federation/package-info.java create mode 100644 container-search/src/main/java/com/yahoo/search/federation/selection/FederationTarget.java create mode 100644 container-search/src/main/java/com/yahoo/search/federation/selection/TargetSelector.java create mode 100644 container-search/src/main/java/com/yahoo/search/federation/selection/package-info.java create mode 100644 container-search/src/main/java/com/yahoo/search/federation/sourceref/SearchChainInvocationSpec.java create mode 100644 container-search/src/main/java/com/yahoo/search/federation/sourceref/SearchChainResolver.java create mode 100644 container-search/src/main/java/com/yahoo/search/federation/sourceref/SingleTarget.java create mode 100644 container-search/src/main/java/com/yahoo/search/federation/sourceref/SourceRefResolver.java create mode 100644 container-search/src/main/java/com/yahoo/search/federation/sourceref/SourcesTarget.java create mode 100644 container-search/src/main/java/com/yahoo/search/federation/sourceref/Target.java create mode 100644 container-search/src/main/java/com/yahoo/search/federation/sourceref/UnresolvedProviderException.java create mode 100644 container-search/src/main/java/com/yahoo/search/federation/sourceref/UnresolvedSearchChainException.java create mode 100644 container-search/src/main/java/com/yahoo/search/federation/sourceref/UnresolvedSourceRefException.java create mode 100644 container-search/src/main/java/com/yahoo/search/federation/vespa/QueryMarshaller.java create mode 100644 container-search/src/main/java/com/yahoo/search/federation/vespa/ResultBuilder.java create mode 100644 container-search/src/main/java/com/yahoo/search/federation/vespa/VespaSearcher.java create mode 100644 container-search/src/main/java/com/yahoo/search/federation/vespa/package-info.java (limited to 'container-search/src/main/java/com/yahoo/search/federation') diff --git a/container-search/src/main/java/com/yahoo/search/federation/CommonFields.java b/container-search/src/main/java/com/yahoo/search/federation/CommonFields.java new file mode 100644 index 00000000000..912a1db6202 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/federation/CommonFields.java @@ -0,0 +1,22 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.federation; +/** + * A set of string constants for common hit field names. + * @author laboisse + * + */ +public class CommonFields { + + public static final String TITLE = "title"; + public static final String URL = "url"; + public static final String DESCRIPTION = "description"; + public static final String DATE = "date"; + public static final String SIZE = "size"; + public static final String DISP_URL = "dispurl"; + public static final String BASE_URL = "baseurl"; + public static final String MIME_TYPE = "mimetype"; + public static final String RELEVANCY = "relevancy"; + public static final String THUMBNAIL_URL = "thumbnailUrl"; + public static final String THUMBNAIL_WIDTH = "thumbnailWidth"; + public static final String THUMBNAIL_HEIGHT = "thumbnailHeight"; +} diff --git a/container-search/src/main/java/com/yahoo/search/federation/FederationSearcher.java b/container-search/src/main/java/com/yahoo/search/federation/FederationSearcher.java new file mode 100644 index 00000000000..4ec04d0d577 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/federation/FederationSearcher.java @@ -0,0 +1,948 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.federation; + +import com.google.inject.Inject; +import com.yahoo.component.ComponentId; +import com.yahoo.component.ComponentSpecification; +import com.yahoo.component.chain.Chain; +import com.yahoo.component.chain.dependencies.After; +import com.yahoo.component.chain.dependencies.Provides; +import com.yahoo.component.provider.ComponentRegistry; +import com.yahoo.concurrent.CopyOnWriteHashMap; +import com.yahoo.errorhandling.Results.Builder; +import com.yahoo.prelude.IndexFacts; +import com.yahoo.processing.request.CompoundName; +import com.yahoo.search.Query; +import com.yahoo.search.Result; +import com.yahoo.search.Searcher; +import com.yahoo.search.federation.selection.FederationTarget; +import com.yahoo.search.federation.selection.TargetSelector; +import com.yahoo.search.federation.sourceref.SearchChainInvocationSpec; +import com.yahoo.search.federation.sourceref.SearchChainResolver; +import com.yahoo.search.federation.sourceref.SingleTarget; +import com.yahoo.search.federation.sourceref.SourceRefResolver; +import com.yahoo.search.federation.sourceref.SourcesTarget; +import com.yahoo.search.federation.sourceref.Target; +import com.yahoo.search.federation.sourceref.UnresolvedSearchChainException; +import com.yahoo.search.query.Properties; +import com.yahoo.search.query.properties.QueryProperties; +import com.yahoo.search.query.properties.SubProperties; +import com.yahoo.search.result.ErrorMessage; +import com.yahoo.search.result.Hit; +import com.yahoo.search.result.HitGroup; +import com.yahoo.search.result.HitOrderer; +import com.yahoo.search.searchchain.AsyncExecution; +import com.yahoo.search.searchchain.Execution; +import com.yahoo.search.searchchain.ForkingSearcher; +import com.yahoo.search.searchchain.FutureResult; +import com.yahoo.search.searchchain.SearchChainRegistry; +import com.yahoo.search.searchchain.model.federation.FederationOptions; +import com.yahoo.errorhandling.Results; + +import org.apache.commons.lang.StringUtils; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.yahoo.collections.CollectionUtil.first; +import static com.yahoo.container.util.Util.quote; +import static com.yahoo.search.federation.StrictContractsConfig.PropagateSourceProperties; + +import java.util.*; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +/** + * This searcher takes a set of sources, looks them up in config and fire off the correct searchchains. + * + * @author Arne Bergene Fossaa + * @author tonytv + */ +@Provides(FederationSearcher.FEDERATION) +@After("*") +public class FederationSearcher extends ForkingSearcher { + public static final String FEDERATION = "Federation"; + + private static abstract class TargetHandler { + abstract Chain getChain(); + abstract void modifyTargetQuery(Query query); + abstract void modifyTargetResult(Result result); + + ComponentId getId() { + return getChain().getId(); + } + + public abstract FederationOptions federationOptions(); + + @Override + public String toString() { + return getChain().getId().stringValue(); + } + + } + + private static class StandardTargetHandler extends TargetHandler { + private final SearchChainInvocationSpec target; + private final Chain chain; + + public StandardTargetHandler(SearchChainInvocationSpec target, Chain chain) { + this.target = target; + this.chain = chain; + } + + @Override + Chain getChain() { + return chain; + } + + @Override + void modifyTargetQuery(Query query) {} + @Override + void modifyTargetResult(Result result) {} + + @Override + public FederationOptions federationOptions() { + return target.federationOptions; + } + } + + + private static class CustomTargetHandler extends TargetHandler { + private final TargetSelector selector; + private final FederationTarget target; + + CustomTargetHandler(TargetSelector selector, FederationTarget target) { + this.selector = selector; + this.target = target; + } + + @Override + Chain getChain() { + return target.getChain(); + } + + @Override + public void modifyTargetQuery(Query query) { + selector.modifyTargetQuery(target, query); + } + + @Override + public void modifyTargetResult(Result result) { + selector.modifyTargetResult(target, result); + } + + @Override + public FederationOptions federationOptions() { + return target.getFederationOptions(); + } + } + + + + private static class ExecutionInfo { + final TargetHandler targetHandler; + final FederationOptions federationOptions; + final FutureResult futureResult; + + public ExecutionInfo(TargetHandler targetHandler, FederationOptions federationOptions, FutureResult futureResult) { + this.targetHandler = targetHandler; + this.federationOptions = federationOptions; + this.futureResult = futureResult; + } + } + + private static class CompoundKey { + private final String sourceName; + private final String propertyName; + CompoundKey(String sourceName, String propertyName) { + this.sourceName = sourceName; + this.propertyName = propertyName; + } + + @Override + public int hashCode() { + return sourceName.hashCode() ^ propertyName.hashCode(); + } + + @Override + public boolean equals(Object o) { + CompoundKey rhs = (CompoundKey) o; + return sourceName.equals(rhs.sourceName) && propertyName.equals(rhs.propertyName); + } + + @Override + public String toString() { + return sourceName + '.' + propertyName; + } + } + + private static class SourceKey extends CompoundKey { + public static final String SOURCE = "source."; + SourceKey(String sourceName, String propertyName) { + super(sourceName, propertyName); + } + + @Override + public int hashCode() { + return super.hashCode() ^ 7; + } + + @Override + public boolean equals(Object o) { + return (o instanceof SourceKey) && super.equals(o); + } + + @Override + public String toString() { + return SOURCE + super.toString(); + } + } + private static class ProviderKey extends CompoundKey { + public static final String PROVIDER = "provider."; + ProviderKey(String sourceName, String propertyName) { + super(sourceName, propertyName); + } + + @Override + public int hashCode() { + return super.hashCode() ^ 17; + } + + @Override + public boolean equals(Object o) { + return (o instanceof ProviderKey) && super.equals(o); + } + + @Override + public String toString() { + return PROVIDER + super.toString(); + } + } + + private static final Logger log = Logger.getLogger(FederationSearcher.class.getName()); + + /** The name of the query property containing the source name added to the query to each source by this */ + public final static CompoundName SOURCENAME = new CompoundName("sourceName"); + public final static CompoundName PROVIDERNAME = new CompoundName("providerName"); + + + /** Logging field name constants */ + public static final String LOG_COUNT_PREFIX = "count_"; + + private final SearchChainResolver searchChainResolver; + private final PropagateSourceProperties.Enum propagateSourceProperties; + private final SourceRefResolver sourceRefResolver; + private final CopyOnWriteHashMap map = new CopyOnWriteHashMap<>(); + + private final boolean strictSearchchain; + private final TargetSelector targetSelector; + + + @Inject + public FederationSearcher(FederationConfig config, StrictContractsConfig strict, + ComponentRegistry targetSelectors) { + this(createResolver(config), strict.searchchains(), strict.propagateSourceProperties(), + resolveSelector(config.targetSelector(), targetSelectors)); + } + + private static TargetSelector resolveSelector(String selectorId, ComponentRegistry targetSelectors) { + if (selectorId.isEmpty()) + return null; + + return checkNotNull( + targetSelectors.getComponent(selectorId), + "Missing target selector with id" + quote(selectorId)); + } + + //for testing + public FederationSearcher(ComponentId id, SearchChainResolver searchChainResolver) { + this(searchChainResolver, false, PropagateSourceProperties.ALL, null); + } + + private FederationSearcher(SearchChainResolver searchChainResolver, boolean strictSearchchain, + PropagateSourceProperties.Enum propagateSourceProperties, + TargetSelector targetSelector) { + this.searchChainResolver = searchChainResolver; + sourceRefResolver = new SourceRefResolver(searchChainResolver); + this.strictSearchchain = strictSearchchain; + this.propagateSourceProperties = propagateSourceProperties; + this.targetSelector = targetSelector; + } + + + private static SearchChainResolver createResolver(FederationConfig config) { + SearchChainResolver.Builder builder = new SearchChainResolver.Builder(); + + for (FederationConfig.Target target : config.target()) { + boolean isDefaultProviderForSource = true; + + for (FederationConfig.Target.SearchChain searchChain : target.searchChain()) { + if (searchChain.providerId() == null || searchChain.providerId().isEmpty()) { + addSearchChain(builder, target, searchChain); + } else { + addSourceForProvider(builder, target, searchChain, isDefaultProviderForSource); + isDefaultProviderForSource = false; + } + } + + //Allow source groups to use by default. + if (target.useByDefault()) + builder.useTargetByDefault(target.id()); + } + + return builder.build(); + } + + private static void addSearchChain(SearchChainResolver.Builder builder, + FederationConfig.Target target, FederationConfig.Target.SearchChain searchChain) { + if (!target.id().equals(searchChain.searchChainId())) + throw new RuntimeException("Invalid federation config, " + target.id() + " != " + searchChain.searchChainId()); + + builder.addSearchChain(ComponentId.fromString(searchChain.searchChainId()), + federationOptions(searchChain), searchChain.documentTypes()); + } + + private static void addSourceForProvider(SearchChainResolver.Builder builder, FederationConfig.Target target, + FederationConfig.Target.SearchChain searchChain, boolean isDefaultProvider) { + builder.addSourceForProvider( + ComponentId.fromString(target.id()), + ComponentId.fromString(searchChain.providerId()), + ComponentId.fromString(searchChain.searchChainId()), + isDefaultProvider, federationOptions(searchChain), + searchChain.documentTypes()); + } + + private static FederationOptions federationOptions(FederationConfig.Target.SearchChain searchChain) { + return new FederationOptions(). + setOptional(searchChain.optional()). + setUseByDefault(searchChain.useByDefault()). + setTimeoutInMilliseconds(searchChain.timeoutMillis()). + setRequestTimeoutInMilliseconds(searchChain.requestTimeoutMillis()); + } + + private static long calculateTimeout(Query query, List targets) { + + class PartitionByOptional { + final List mandatoryTargets; + final List optionalTargets; + + PartitionByOptional(List targets) { + List mandatoryTargets = new ArrayList<>(); + List optionalTargets = new ArrayList<>(); + + for (TargetHandler target : targets) { + if (target.federationOptions().getOptional()) { + optionalTargets.add(target); + } else { + mandatoryTargets.add(target); + } + } + + this.mandatoryTargets = Collections.unmodifiableList(mandatoryTargets); + this.optionalTargets = Collections.unmodifiableList(optionalTargets); + } + } + + if (query.requestHasProperty("timeout") || targets.isEmpty()) { + return query.getTimeLeft(); + } else { + PartitionByOptional partition = new PartitionByOptional(targets); + long queryTimeout = query.getTimeout(); + + return partition.mandatoryTargets.isEmpty() ? + maximumTimeout(partition.optionalTargets, queryTimeout) : + maximumTimeout(partition.mandatoryTargets, queryTimeout); + } + } + + private static long maximumTimeout(List invocationSpecs, long queryTimeout) { + long timeout = 0; + for (TargetHandler target : invocationSpecs) { + timeout = Math.max(timeout, + target.federationOptions().getSearchChainExecutionTimeoutInMilliseconds(queryTimeout)); + } + return timeout; + } + + private void addSearchChainTimedOutError(Query query, + ComponentId searchChainId) { + ErrorMessage timeoutMessage= + ErrorMessage.createTimeout("The search chain '" + searchChainId + "' timed out."); + timeoutMessage.setSource(searchChainId.stringValue()); + query.errors().add(timeoutMessage); + } + + private void mergeResult(Query query, TargetHandler targetHandler, + Result mergedResults, Result result) { + + + targetHandler.modifyTargetResult(result); + final ComponentId searchChainId = targetHandler.getId(); + Chain searchChain = targetHandler.getChain(); + + mergedResults.mergeWith(result); + HitGroup group = result.hits(); + group.setId("source:" + searchChainId.getName()); + + group.setSearcherSpecificMetaData(this, searchChain); + group.setMeta(false); // Set hit groups as non-meta as a default + group.setAuxiliary(true); // Set hit group as auxiliary so that it doesn't contribute to count + group.setSource(searchChainId.getName()); + group.setQuery(result.getQuery()); + + for (Iterator it = group.unorderedDeepIterator(); it.hasNext();) { + Hit hit = it.next(); + hit.setSearcherSpecificMetaData(this, searchChain); + hit.setSource(searchChainId.stringValue()); + + // This is the backend request meta hit, that is holding logging information + // See HTTPBackendSearcher, where this hit is created + if (hit.isMeta() && hit.types().contains("logging")) { + // Augment this hit with count fields + hit.setField(LOG_COUNT_PREFIX + "deep", result.getDeepHitCount()); + hit.setField(LOG_COUNT_PREFIX + "total", result.getTotalHitCount()); + int offset = result.getQuery().getOffset(); + hit.setField(LOG_COUNT_PREFIX + "first", offset + 1); + hit.setField(LOG_COUNT_PREFIX + "last", result.getConcreteHitCount() + offset); + } + + } + if (query.getTraceLevel()>=4) + query.trace("Got " + group.getConcreteSize() + " hits from " + group.getId(),false, 4); + mergedResults.hits().add(group); + } + + private boolean successfullyCompleted(FutureResult result) { + return result.isDone() && !result.isCancelled(); + } + + private Query setupSingleQuery(Query query, long timeout, TargetHandler targetHandler) { + if (strictSearchchain) { + query.resetTimeout(); + return setupFederationQuery(query, query, + windowParameters(query.getHits(), query.getOffset()), timeout, targetHandler); + } else { + return cloneFederationQuery(query, + windowParameters(query.getHits(), query.getOffset()), timeout, targetHandler); + } + } + + private Result startExecuteSingleQuery(Query query, TargetHandler chain, long timeout, Execution execution) { + Query outgoing = setupSingleQuery(query, timeout, chain); + Execution exec = new Execution(chain.getChain(), execution.context()); + return exec.search(outgoing); + } + + private List startExecuteQueryForEachTarget( + Query query, Collection targets, long timeout, Execution execution) { + + List results = new ArrayList<>(); + + Map windowParameters; + if (targets.size()==1) // preserve requested top-level offset by default as an optimization + windowParameters = Collections.unmodifiableMap(windowParameters(query.getHits(), query.getOffset())); + else // request from offset 0 to enable correct upstream blending into a single top-level hit list + windowParameters = Collections.unmodifiableMap(windowParameters(query.getHits() + query.getOffset(), 0)); + + for (TargetHandler targetHandler : targets) { + long executeTimeout = timeout; + if (targetHandler.federationOptions().getRequestTimeoutInMilliseconds() != -1) + executeTimeout = targetHandler.federationOptions().getRequestTimeoutInMilliseconds(); + results.add(new ExecutionInfo(targetHandler, targetHandler.federationOptions(), + createFutureSearch(query, windowParameters, targetHandler, executeTimeout, execution))); + } + + return results; + } + + private Map windowParameters(int hits, int offset) { + Map params = new HashMap<>(); + params.put(Query.HITS.toString(), hits); + params.put(Query.OFFSET.toString(), offset); + return params; + } + + private FutureResult createFutureSearch(Query query, Map windowParameters, TargetHandler targetHandler, + long timeout, Execution execution) { + Query clonedQuery = cloneFederationQuery(query, windowParameters, timeout, targetHandler); + return new AsyncExecution(targetHandler.getChain(), execution).search(clonedQuery); + } + + + private Query cloneFederationQuery(Query query, + Map windowParameters, long timeout, TargetHandler targetHandler) { + Query clonedQuery = Query.createNewQuery(query); + return setupFederationQuery(query, clonedQuery, windowParameters, timeout, targetHandler); + } + + private Query setupFederationQuery(Query query, Query outgoing, + Map windowParameters, long timeout, TargetHandler targetHandler) { + + ComponentId chainId = targetHandler.getChain().getId(); + + String sourceName = chainId.getName(); + outgoing.properties().set(SOURCENAME, sourceName); + String providerName = chainId.getName(); + if (chainId.getNamespace() != null) + providerName = chainId.getNamespace().getName(); + outgoing.properties().set(PROVIDERNAME, providerName); + + outgoing.setTimeout(timeout); + + switch (propagateSourceProperties) { + case ALL: + propagatePerSourceQueryProperties(query, outgoing, windowParameters, sourceName, providerName, + QueryProperties.PER_SOURCE_QUERY_PROPERTIES); + break; + case OFFSET_HITS: + propagatePerSourceQueryProperties(query, outgoing, windowParameters, sourceName, providerName, + new CompoundName[]{Query.OFFSET, Query.HITS}); + break; + } + + //TODO: FederationTarget + //TODO: only for target produced by this, not others + targetHandler.modifyTargetQuery(outgoing); + return outgoing; + } + + private void propagatePerSourceQueryProperties(Query original, Query outgoing, + Map windowParameters, + String sourceName, String providerName, + CompoundName[] queryProperties) { + + for (CompoundName key : queryProperties) { + Object value = getSourceOrProviderProperty(original, key, sourceName, providerName, windowParameters.get(key.toString())); + if (value != null) { + outgoing.properties().set(key, value); + } + } + } + + private Object getSourceOrProviderProperty(Query query, CompoundName propertyName, + String sourceName, String providerName, + Object defaultValue) { + Object result = getProperty(query, new SourceKey(sourceName, propertyName.toString())); + if (result == null) + result = getProperty(query, new ProviderKey(providerName, propertyName.toString())); + if (result == null) + result = defaultValue; + + return result; + } + + private Object getProperty(Query query, CompoundKey key) { + + CompoundName name = map.get(key); + if (name == null) { + name = new CompoundName(key.toString()); + map.put(key, name); + } + return query.properties().get(name); + } + + private ErrorMessage missingSearchChainsErrorMessage(List unresolvedSearchChainExceptions) { + StringBuilder sb = new StringBuilder(); + sb.append(StringUtils.join(getMessagesSet(unresolvedSearchChainExceptions), ' ')); + + + sb.append(" Valid source refs are "); + sb.append( + StringUtils.join(allSourceRefDescriptions().iterator(), + ", ")).append('.'); + + return ErrorMessage.createInvalidQueryParameter(sb.toString()); + } + + private List allSourceRefDescriptions() { + List descriptions = new ArrayList<>(); + + for (Target target : searchChainResolver.allTopLevelTargets()) { + descriptions.add(target.searchRefDescription()); + } + return descriptions; + } + + private Set getMessagesSet(List unresolvedSearchChainExceptions) { + Set messages = new LinkedHashSet<>(); + for (UnresolvedSearchChainException exception : unresolvedSearchChainExceptions) { + messages.add(exception.getMessage()); + } + return messages; + } + + private void warnIfUnresolvedSearchChains(List missingTargets, + HitGroup errorHitGroup) { + + if (!missingTargets.isEmpty()) { + errorHitGroup.addError(missingSearchChainsErrorMessage(missingTargets)); + } + } + + @Override + public Collection getSearchChainsForwarded(SearchChainRegistry registry) { + List searchChains = new ArrayList<>(); + + for (Target target : searchChainResolver.allTopLevelTargets()) { + if (target instanceof SourcesTarget) { + searchChains.addAll(commentedSourceProviderSearchChains((SourcesTarget)target, registry)); + } else if (target instanceof SingleTarget) { + searchChains.add(commentedSearchChain((SingleTarget)target, registry)); + } else { + log.warning("Invalid target type " + target.getClass().getName()); + } + } + + return searchChains; + } + + private CommentedSearchChain commentedSearchChain(SingleTarget singleTarget, SearchChainRegistry registry) { + return new CommentedSearchChain("If source refs contains '" + singleTarget.getId() + "'.", + registry.getChain(singleTarget.getId())); + } + + private List commentedSourceProviderSearchChains(SourcesTarget sourcesTarget, + SearchChainRegistry registry) { + + List commentedSearchChains = new ArrayList<>(); + String ifMatchingSourceRefPrefix = "If source refs contains '" + sourcesTarget.getId() + "' and provider is '"; + + commentedSearchChains.add( + new CommentedSearchChain(ifMatchingSourceRefPrefix + sourcesTarget.defaultProviderSource().provider + + "'(or not given).", registry.getChain(sourcesTarget.defaultProviderSource().searchChainId))); + + for (SearchChainInvocationSpec providerSource : sourcesTarget.allProviderSources()) { + if (!providerSource.equals(sourcesTarget.defaultProviderSource())) { + commentedSearchChains.add( + new CommentedSearchChain(ifMatchingSourceRefPrefix + providerSource.provider + "'.", + registry.getChain(providerSource.searchChainId))); + } + } + return commentedSearchChains; + } + + /** Returns the set of properties set for the source or provider given in the query (if any). + * + * If the query has not set sourceName or providerName, null will be returned */ + public static Properties getSourceProperties(Query query) { + String sourceName = query.properties().getString(SOURCENAME); + String providerName = query.properties().getString(PROVIDERNAME); + if (sourceName == null || providerName == null) + return null; + Properties sourceProperties = new SubProperties("source." + sourceName, query.properties()); + Properties providerProperties = new SubProperties("provider." + providerName, query.properties()); + sourceProperties.chain(providerProperties); + return sourceProperties; + } + + @Override + public void fill(final Result result, final String summaryClass, Execution execution) { + List filledResults = new ArrayList<>(); + UniqueExecutionsToResults uniqueExecutionsToResults = new UniqueExecutionsToResults(); + addResultsToFill(result.hits(), result, summaryClass, uniqueExecutionsToResults); + final Set, Map>> resultsForAllChains = uniqueExecutionsToResults.resultsToFill + .entrySet(); + int numberOfCallsToFillNeeded = 0; + + for (Entry, Map> resultsToFillForAChain : resultsForAllChains) { + numberOfCallsToFillNeeded += resultsToFillForAChain.getValue().size(); + } + + for (Entry, Map> resultsToFillForAChain : resultsForAllChains) { + Chain chain = resultsToFillForAChain.getKey(); + Execution chainExecution = (chain == null) ? execution : new Execution(chain, execution.context()); + + for (Entry resultsToFillForAChainAndQuery : resultsToFillForAChain.getValue().entrySet()) { + Result resultToFill = resultsToFillForAChainAndQuery.getValue(); + if (numberOfCallsToFillNeeded == 1) { + chainExecution.fill(resultToFill, summaryClass); + propagateErrors(resultToFill, result); + } else { + AsyncExecution asyncFill = new AsyncExecution(chainExecution); + filledResults.add(asyncFill.fill(resultToFill, summaryClass)); + } + } + } + for (FutureResult filledResult : filledResults) { + propagateErrors(filledResult.get(result.getQuery().getTimeLeft(), TimeUnit.MILLISECONDS), result); + } + } + + private void propagateErrors(Result source, Result destination) { + ErrorMessage error = source.hits().getError(); + if (error != null) + destination.hits().addError(error); + } + + /** A map from a unique search chain and query instance to a result */ + private static class UniqueExecutionsToResults { + + /** Implemented as a nested identity hashmap */ + final Map,Map> resultsToFill = new IdentityHashMap<>(); + + /** Returns a result to fill for a query and chain, by creating it if necessary */ + public Result get(Chain chain, Query query) { + Map resultsToFillForAChain = resultsToFill.get(chain); + if (resultsToFillForAChain == null) { + resultsToFillForAChain = new IdentityHashMap<>(); + resultsToFill.put(chain,resultsToFillForAChain); + } + + Result resultsToFillForAChainAndQuery = resultsToFillForAChain.get(query); + if (resultsToFillForAChainAndQuery == null) { + resultsToFillForAChainAndQuery = new Result(query); + resultsToFillForAChain.put(query,resultsToFillForAChainAndQuery); + } + + return resultsToFillForAChainAndQuery; + } + + } + + private void addResultsToFill(HitGroup hitGroup, Result result, String summaryClass, + UniqueExecutionsToResults uniqueExecutionsToResults) { + for (Hit hit : hitGroup) { + if (hit instanceof HitGroup) { + addResultsToFill((HitGroup) hit, result, summaryClass, uniqueExecutionsToResults); + } else { + if ( ! hit.isFilled(summaryClass)) + getSearchChainGroup(hit,result,uniqueExecutionsToResults).hits().add(hit); + } + } + } + + private Result getSearchChainGroup(Hit hit, Result result, UniqueExecutionsToResults uniqueExecutionsToResults) { + @SuppressWarnings("unchecked") + Chain chain = (Chain) hit.getSearcherSpecificMetaData(this); + Query query = hit.getQuery() !=null ? hit.getQuery() : result.getQuery(); + + return uniqueExecutionsToResults.get(chain,query); + } + + private void searchMultipleTargets(Query query, Result mergedResults, + Collection targets, + long timeout, + Execution execution) { + + List executionInfos = startExecuteQueryForEachTarget(query, targets, timeout, execution); + waitForMandatoryTargets(executionInfos, query.getTimeout()); + + HitOrderer s=null; + for (ExecutionInfo executionInfo : executionInfos) { + if ( ! successfullyCompleted(executionInfo.futureResult)) { + addSearchChainTimedOutError(query, executionInfo.targetHandler.getId()); + } else { + if (s == null) { + s = dirtyCopyIfModifiedOrderer(mergedResults.hits(), executionInfo.futureResult.get().hits().getOrderer()); + } + mergeResult(query, executionInfo.targetHandler, mergedResults, executionInfo.futureResult.get()); + + } + } + } + + /** + * TODO This is probably a dirty hack for bug 4711376. There are probably better ways. + * But I will leave that to trd-processing@ + * + * @param group The merging hitgroup to be updated if necessary + * @param orderer The per provider hit orderer. + * @return The hitorderer chosen + */ + private HitOrderer dirtyCopyIfModifiedOrderer(HitGroup group, HitOrderer orderer) { + if (orderer != null) { + HitOrderer old = group.getOrderer(); + if ((old == null) || ! orderer.equals(old)) { + group.setOrderer(orderer); + } + } + + return orderer; + } + + private void waitForMandatoryTargets(List executionInfos, long queryTimeout) { + FutureWaiter futureWaiter = new FutureWaiter(); + + boolean hasMandatoryTargets = false; + for (ExecutionInfo executionInfo : executionInfos) { + if (isMandatory(executionInfo)) { + futureWaiter.add(executionInfo.futureResult, + getSearchChainExecutionTimeoutInMilliseconds(executionInfo, queryTimeout)); + hasMandatoryTargets = true; + } + } + + if (!hasMandatoryTargets) { + for (ExecutionInfo executionInfo : executionInfos) { + futureWaiter.add(executionInfo.futureResult, + getSearchChainExecutionTimeoutInMilliseconds(executionInfo, queryTimeout)); + } + } + + futureWaiter.waitForFutures(); + } + + private long getSearchChainExecutionTimeoutInMilliseconds(ExecutionInfo executionInfo, long queryTimeout) { + return executionInfo.federationOptions. + getSearchChainExecutionTimeoutInMilliseconds(queryTimeout); + } + + private boolean isMandatory(ExecutionInfo executionInfo) { + return !executionInfo.federationOptions.getOptional(); + } + + private void searchSingleTarget(Query query, Result mergedResults, + TargetHandler targetHandler, + long timeout, + Execution execution) { + Result result = startExecuteSingleQuery(query, targetHandler, timeout, execution); + mergeResult(query, targetHandler, mergedResults, result); + } + + + private Results getTargets(Set sources, Properties properties, IndexFacts indexFacts) { + return sources.isEmpty() ? + defaultSearchChains(properties): + resolveSources(sources, properties, indexFacts); + } + + private Results resolveSources(Set sources, Properties properties, IndexFacts indexFacts) { + Results.Builder result = new Builder<>(); + + for (String source : sources) { + try { + result.addAllData(sourceRefResolver.resolve(asSourceSpec(source), properties, indexFacts)); + } catch (UnresolvedSearchChainException e) { + result.addError(e); + } + } + + return result.build(); + } + + + public Results defaultSearchChains(Properties sourceToProviderMap) { + Results.Builder result = new Builder<>(); + + for (Target target : searchChainResolver.defaultTargets()) { + try { + result.addData(target.responsibleSearchChain(sourceToProviderMap)); + } catch (UnresolvedSearchChainException e) { + result.addError(e); + } + } + + return result.build(); + } + + + private ComponentSpecification asSourceSpec(String source) { + try { + return new ComponentSpecification(source); + } catch(Exception e) { + throw new IllegalArgumentException("The source ref '" + source + + "' used for federation is not valid.", e); + } + } + + @Override + public Result search(Query query, Execution execution) { + Result mergedResults = execution.search(query); + + Results targets = + getTargets(query.getModel().getSources(), query.properties(), execution.context().getIndexFacts()); + warnIfUnresolvedSearchChains(targets.errors(), mergedResults.hits()); + + Collection prunedTargets = + pruneTargetsWithoutDocumentTypes(query.getModel().getRestrict(), targets.data()); + + Results regularTargetHandlers = resolveSearchChains(prunedTargets, execution.searchChainRegistry()); + query.errors().addAll(regularTargetHandlers.errors()); + + List targetHandlers = new ArrayList<>(regularTargetHandlers.data()); + targetHandlers.addAll(getAdditionalTargets(query, execution, targetSelector)); + + final long targetsTimeout = calculateTimeout(query, targetHandlers); + if (targetsTimeout < 0) + return new Result(query, ErrorMessage.createTimeout("Timed out when about to federate")); + + traceTargets(query, targetHandlers); + + if (targetHandlers.size() == 0) { + return mergedResults; + } else if (targetHandlers.size() == 1 && ! shouldExecuteTargetLongerThanThread(query, targetHandlers.get(0))) { + TargetHandler chain = first(targetHandlers); + searchSingleTarget(query, mergedResults, chain, targetsTimeout, execution); + } else { + searchMultipleTargets(query, mergedResults, targetHandlers, targetsTimeout, execution); + } + + return mergedResults; + } + + private void traceTargets(Query query, List targetHandlers) { + final int traceFederationLevel = 2; + if ( ! query.isTraceable(traceFederationLevel)) return; + query.trace("Federating to " + targetHandlers, traceFederationLevel); + } + + /** + * Returns true if we are requested to keep executing a target longer than we're waiting for it. + * This is useful to populate caches inside targets. + */ + private boolean shouldExecuteTargetLongerThanThread(Query query, TargetHandler target) { + return target.federationOptions().getRequestTimeoutInMilliseconds() > query.getTimeout(); + } + + private static Results resolveSearchChains( + Collection prunedTargets, + SearchChainRegistry registry) { + + Results.Builder targetHandlers = new Results.Builder<>(); + + for (SearchChainInvocationSpec target: prunedTargets) { + Chain chain = registry.getChain(target.searchChainId); + if (chain == null) { + targetHandlers.addError(ErrorMessage.createIllegalQuery( + "Could not find search chain '" + target.searchChainId + "'")); + } else { + targetHandlers.addData(new StandardTargetHandler(target, chain)); + } + } + + return targetHandlers.build(); + } + + private static List getAdditionalTargets(Query query, Execution execution, TargetSelector targetSelector) { + if (targetSelector == null) + return Collections.emptyList(); + + ArrayList result = new ArrayList<>(); + for (FederationTarget target: targetSelector.getTargets(query, execution.searchChainRegistry())) + result.add(new CustomTargetHandler<>(targetSelector, target)); + + return result; + } + + private Collection pruneTargetsWithoutDocumentTypes(Set restrict, List targets) { + if (restrict.isEmpty()) + return targets; + + Collection prunedTargets = new ArrayList<>(); + + for (SearchChainInvocationSpec target : targets) { + if (target.documentTypes.isEmpty() || documentTypeIntersectionIsNonEmpty(restrict, target)) + prunedTargets.add(target); + } + + return prunedTargets; + } + + private boolean documentTypeIntersectionIsNonEmpty(Set restrict, SearchChainInvocationSpec target) { + for (String documentType : target.documentTypes) { + if (restrict.contains(documentType)) + return true; + } + + return false; + } +} diff --git a/container-search/src/main/java/com/yahoo/search/federation/ForwardingSearcher.java b/container-search/src/main/java/com/yahoo/search/federation/ForwardingSearcher.java new file mode 100644 index 00000000000..b43798113de --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/federation/ForwardingSearcher.java @@ -0,0 +1,106 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.federation; + +import com.yahoo.component.ComponentSpecification; +import com.yahoo.component.chain.Chain; +import com.yahoo.component.chain.dependencies.After; +import com.yahoo.prelude.Ping; +import com.yahoo.prelude.Pong; +import com.yahoo.search.Query; +import com.yahoo.search.Result; +import com.yahoo.search.Searcher; +import com.yahoo.search.cluster.PingableSearcher; +import com.yahoo.search.result.ErrorMessage; +import com.yahoo.search.result.HitGroup; +import com.yahoo.search.searchchain.Execution; + +/** + * A lightweight searcher to forward all incoming requests to a single search + * chain defined in config. An alternative to federation searcher when standard + * semantics are not necessary for the application. + * + * @see FederationSearcher + * @since 5.0.13 + * @author Steinar Knutsen + */ +@After("*") +public class ForwardingSearcher extends PingableSearcher { + private final ComponentSpecification target; + + public ForwardingSearcher(final SearchchainForwardConfig config) { + if (config.target() == null) { + throw new RuntimeException( + "Configuration value searchchain-forward.target was null."); + } + try { + target = new ComponentSpecification(config.target()); + } catch (RuntimeException e) { + throw new RuntimeException( + "Failed constructing the component specification from searchchain-forward.target: " + + config.target(), e); + } + } + + @Override + public Result search(final Query query, final Execution execution) { + Execution next = createForward(execution); + + if (next == null) { + return badResult(query); + } else { + return next.search(query); + } + } + + private Result badResult(final Query query) { + final ErrorMessage error = noSearchchain(); + return new Result(query, error); + } + + @Override + public Pong ping(final Ping ping, final Execution execution) { + Execution next = createForward(execution); + + if (next == null) { + return badPong(); + } else { + return next.ping(ping); + } + } + + private Pong badPong() { + final Pong pong = new Pong(); + pong.addError(noSearchchain()); + return pong; + } + + @Override + public void fill(final Result result, final String summaryClass, + final Execution execution) { + Execution next = createForward(execution); + if (next == null) { + badFill(result.hits()); + return; + } else { + next.fill(result, summaryClass); + } + } + + private void badFill(HitGroup hits) { + hits.addError(noSearchchain()); + } + + private Execution createForward(Execution execution) { + Chain targetChain = execution.context().searchChainRegistry() + .getComponent(target); + if (targetChain == null) { + return null; + } + return new Execution(targetChain, execution.context()); + } + + private ErrorMessage noSearchchain() { + return ErrorMessage + .createServerIsMisconfigured("Could not get search chain matching component specification: " + target); + } +} diff --git a/container-search/src/main/java/com/yahoo/search/federation/FutureWaiter.java b/container-search/src/main/java/com/yahoo/search/federation/FutureWaiter.java new file mode 100644 index 00000000000..52cd5397489 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/federation/FutureWaiter.java @@ -0,0 +1,58 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.federation; + +import com.yahoo.search.searchchain.FutureResult; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * @author tonytv + */ +class FutureWaiter { + private class Future { + final FutureResult result; + final long timeoutInMilliseconds; + + public Future(FutureResult result, long timeoutInMilliseconds) { + this.result = result; + this.timeoutInMilliseconds = timeoutInMilliseconds; + } + } + + private List futures = new ArrayList<>(); + + public void add(FutureResult futureResult, long timeoutInMilliseconds) { + futures.add(new Future(futureResult, timeoutInMilliseconds)); + } + + public void waitForFutures() { + sortFuturesByTimeoutDescending(); + + final long startTime = System.currentTimeMillis(); + + for (Future future : futures) { + long timeToWait = startTime + future.timeoutInMilliseconds - System.currentTimeMillis(); + if (timeToWait <= 0) + break; + + future.result.get(timeToWait, TimeUnit.MILLISECONDS); + } + } + + private void sortFuturesByTimeoutDescending() { + Collections.sort(futures, new Comparator() { + @Override + public int compare(Future lhs, Future rhs) { + return -compareLongs(lhs.timeoutInMilliseconds, rhs.timeoutInMilliseconds); + } + + private int compareLongs(long lhs, long rhs) { + return new Long(lhs).compareTo(rhs); + } + }); + } +} diff --git a/container-search/src/main/java/com/yahoo/search/federation/TimeoutException.java b/container-search/src/main/java/com/yahoo/search/federation/TimeoutException.java new file mode 100644 index 00000000000..8b7e8a1d9d5 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/federation/TimeoutException.java @@ -0,0 +1,20 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.federation; + +/** + * Thrown on timeouts + * + * @author Jon Bratseth + */ +@SuppressWarnings("serial") +public class TimeoutException extends RuntimeException { + + public TimeoutException(String message) { + super(message); + } + + public TimeoutException(String message,Throwable cause) { + super(message,cause); + } + +} diff --git a/container-search/src/main/java/com/yahoo/search/federation/http/ConfiguredHTTPClientSearcher.java b/container-search/src/main/java/com/yahoo/search/federation/http/ConfiguredHTTPClientSearcher.java new file mode 100644 index 00000000000..576c16f68db --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/federation/http/ConfiguredHTTPClientSearcher.java @@ -0,0 +1,36 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.federation.http; + +import java.util.Collections; + +import com.yahoo.component.ComponentId; +import com.yahoo.search.federation.ProviderConfig; +import com.yahoo.search.Result; +import com.yahoo.search.searchchain.Execution; +import com.yahoo.statistics.Statistics; + + +/** + * Superclass for http client searchers which depends on config. All this is doing is translating + * the provider and cache configurations to parameters which are passed upwards. + * + * @author Jon Bratseth + */ +public abstract class ConfiguredHTTPClientSearcher extends HTTPClientSearcher { + + /** Create this from a configuraton */ + public ConfiguredHTTPClientSearcher(final ComponentId id, final ProviderConfig providerConfig, Statistics manager) { + super(id, ConfiguredSearcherHelper.toConnectionList(providerConfig), new HTTPParameters(providerConfig), manager); + } + + /** Create an instance from direct parameters having a single connection. Useful for testing */ + public ConfiguredHTTPClientSearcher(String idString,String host,int port,String path, Statistics manager) { + super(new ComponentId(idString), Collections.singletonList(new Connection(host,port)),path, manager); + } + + /** Forwards to the next in chain fill(result,summaryName) */ + public @Override void fill(Result result,String summaryName, Execution execution,Connection connection) { + execution.fill(result,summaryName); + } + +} diff --git a/container-search/src/main/java/com/yahoo/search/federation/http/ConfiguredHTTPProviderSearcher.java b/container-search/src/main/java/com/yahoo/search/federation/http/ConfiguredHTTPProviderSearcher.java new file mode 100644 index 00000000000..25253f768bd --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/federation/http/ConfiguredHTTPProviderSearcher.java @@ -0,0 +1,68 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.federation.http; + +import com.yahoo.component.ComponentId; +import com.yahoo.search.federation.ProviderConfig; +import com.yahoo.search.cache.QrBinaryCacheConfig; +import com.yahoo.search.cache.QrBinaryCacheRegionConfig; +import com.yahoo.search.Result; +import com.yahoo.search.searchchain.Execution; +import com.yahoo.statistics.Statistics; + +import java.util.Collections; + + +/** + * Superclass for http provider searchers which depends on config. All this is doing is translating + * the provider and cache configurations to parameters which are passed upwards. + * + * @author Arne Bergene Fossaa + * @author bratseth + */ +public abstract class ConfiguredHTTPProviderSearcher extends HTTPProviderSearcher { + + /** Create this from a configuraton */ + public ConfiguredHTTPProviderSearcher(final ComponentId id, final ProviderConfig providerConfig, Statistics manager) { + super(id,ConfiguredSearcherHelper.toConnectionList(providerConfig),new HTTPParameters(providerConfig), manager); + } + + /** Create this from a configuraton */ + public ConfiguredHTTPProviderSearcher(final ComponentId id, final ProviderConfig providerConfig, + HTTPParameters parameters, Statistics manager) { + super(id,ConfiguredSearcherHelper.toConnectionList(providerConfig),parameters, manager); + } + + /** Create this from a configuraton with a configured cache */ + public ConfiguredHTTPProviderSearcher(final ComponentId id, final ProviderConfig providerConfig, + final QrBinaryCacheConfig cacheConfig, + final QrBinaryCacheRegionConfig regionConfig, Statistics manager) { + super(id,ConfiguredSearcherHelper.toConnectionList(providerConfig),new HTTPParameters(providerConfig), manager); + configureCache(cacheConfig,regionConfig); + } + + /** Create this from a configuraton with a configured cache */ + public ConfiguredHTTPProviderSearcher(final ComponentId id, final ProviderConfig providerConfig, + final QrBinaryCacheConfig cacheConfig, + final QrBinaryCacheRegionConfig regionConfig, HTTPParameters parameters, Statistics manager) { + super(id,ConfiguredSearcherHelper.toConnectionList(providerConfig),parameters, manager); + configureCache(cacheConfig,regionConfig); + } + + /** Create an instance from direct parameters having a single connection. Useful for testing */ + public ConfiguredHTTPProviderSearcher(String idString,String host,int port,String path, Statistics manager) { + super(new ComponentId(idString), Collections.singletonList(new Connection(host,port)),path, manager); + } + + /** Create an instance from direct parameters having a single connection. Useful for testing */ + public ConfiguredHTTPProviderSearcher(String idString,String host,int port,HTTPParameters parameters, Statistics manager) { + super(new ComponentId(idString), Collections.singletonList(new Connection(host,port)),parameters, manager); + } + + /** + * Override this to provider multi-phase result filling towards a backend. + * This default implementation does nothing. + */ + public @Override void fill(Result result,String summaryName, Execution execution,Connection connection) { + } + +} diff --git a/container-search/src/main/java/com/yahoo/search/federation/http/ConfiguredSearcherHelper.java b/container-search/src/main/java/com/yahoo/search/federation/http/ConfiguredSearcherHelper.java new file mode 100644 index 00000000000..8d3ee016b4f --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/federation/http/ConfiguredSearcherHelper.java @@ -0,0 +1,27 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.federation.http; + +import java.util.ArrayList; +import java.util.List; + +import com.yahoo.search.federation.ProviderConfig; + +/** + * Some static helper classes for configured*Searcher classes + * + * @author Jon Bratseth + */ +class ConfiguredSearcherHelper { + + /** No instantiation */ + private ConfiguredSearcherHelper() { } + + public static List toConnectionList(ProviderConfig providerConfig) { + List connections=new ArrayList<>(); + for(ProviderConfig.Node node : providerConfig.node()) { + connections.add(new Connection(node.host(), node.port())); + } + return connections; + } + +} diff --git a/container-search/src/main/java/com/yahoo/search/federation/http/Connection.java b/container-search/src/main/java/com/yahoo/search/federation/http/Connection.java new file mode 100644 index 00000000000..88e2c6ad0a0 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/federation/http/Connection.java @@ -0,0 +1,30 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.federation.http; + +/** + * Represents a connection to a particular node (host/port). + * Right now this is just a container of connection parameters, but might be extended to + * contain an open connection later. + * The host and port state is immutable. + * + * @author Jon Bratseth + */ +public class Connection { + + private String host; + private int port; + + public Connection(String host,int port) { + this.host=host; + this.port=port; + } + + public String getHost() { return host; } + + public int getPort() { return port; } + + public String toString() { + return "http connection '" + host + ":" + port + "'"; + } + +} diff --git a/container-search/src/main/java/com/yahoo/search/federation/http/GzipDecompressingEntity.java b/container-search/src/main/java/com/yahoo/search/federation/http/GzipDecompressingEntity.java new file mode 100644 index 00000000000..1dc58ecd65e --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/federation/http/GzipDecompressingEntity.java @@ -0,0 +1,125 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.federation.http; + +import org.apache.http.HttpEntity; +import org.apache.http.entity.HttpEntityWrapper; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.zip.GZIPInputStream; + +/** + * Used by HTTPSearcher when talking to services returning compressed content. + * + * @author Mainak Mandal + */ +public class GzipDecompressingEntity extends HttpEntityWrapper { + + private static class Resources { + + byte [] buffer; + int total; + + Resources() { + total = 0; + buffer = new byte[65536]; + } + void drain(InputStream zipStream) throws IOException { + int numRead = zipStream.read(buffer, total, buffer.length); + while (numRead != -1) { + total += numRead; + if ((total + 65536) > buffer.length) { + buffer = Arrays.copyOf(buffer, buffer.length + numRead); + } + numRead = zipStream.read(buffer, total, buffer.length - total); + } + } + + } + + private final Resources resources = new Resources(); + + public GzipDecompressingEntity(final HttpEntity entity) throws IllegalStateException, IOException { + super(entity); + GZIPInputStream gz = new GZIPInputStream(entity.getContent()); + InputStream zipStream = new BufferedInputStream(gz); + try { + resources.drain(zipStream); + } catch (IOException e) { + throw e; + } finally { + zipStream.close(); + } + } + + @Override + public InputStream getContent() throws IOException, IllegalStateException { + + final ByteBuffer buff = ByteBuffer.wrap(resources.buffer, 0, resources.total); + return new InputStream() { + + @Override + public int available() throws IOException { + return buff.remaining(); + } + + @Override + public int read() throws IOException { + if (buff.hasRemaining()) + return buff.get() & 0xFF; + + return -1; + } + + @Override + public int read(byte[] b) throws IOException { + if (!buff.hasRemaining()) + return -1; + + int len = b.length; + if (len > buff.remaining()) + len = buff.remaining(); + buff.get(b, 0, len); + return len; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (!buff.hasRemaining()) + return -1; + + if (len > buff.remaining()) + len = buff.remaining(); + buff.get(b, off, len); + return len; + } + + @Override + public long skip(long n) throws IOException { + if (!buff.hasRemaining()) + return -1; + + if (n > buff.remaining()) + n = buff.remaining(); + + buff.position(buff.position() + (int) n); + return n; + } + }; + } + + @Override + public long getContentLength() { + return resources.total; + } + + @Override + public void writeTo(OutputStream outstream) throws IOException { + outstream.write(resources.buffer, 0, resources.total); + } + +} diff --git a/container-search/src/main/java/com/yahoo/search/federation/http/HTTPClientSearcher.java b/container-search/src/main/java/com/yahoo/search/federation/http/HTTPClientSearcher.java new file mode 100644 index 00000000000..1459fb6f226 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/federation/http/HTTPClientSearcher.java @@ -0,0 +1,276 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.federation.http; + +import com.yahoo.component.ComponentId; +import com.yahoo.jdisc.http.CertificateStore; +import com.yahoo.yolean.Exceptions; +import com.yahoo.search.Query; +import com.yahoo.search.Result; +import com.yahoo.processing.request.CompoundName; +import com.yahoo.search.result.ErrorMessage; +import com.yahoo.search.result.Hit; +import com.yahoo.search.searchchain.Execution; +import com.yahoo.statistics.Statistics; + +import org.apache.http.HttpEntity; + +import java.io.IOException; +import java.io.InputStream; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.logging.Logger; + +/** + * A utility parent for searchers which gets data from web services which is incorporated into the query. + * This searcher will take care of implementing the search method while the extending class implements + * {@link #getQueryMap} and {@link #handleResponse} to create the http request and handle the response, respectively. + * + *

This class automatically adds a meta hit containing latency and other + * meta information about the obtained HTTP data using createRequestMeta(). + * The fields available in the hit are:

+ * + *
+ * HTTPSearcher.LOG_LATENCY_START + *
+ * The latency of the external provider answering a request. + *
+ * HTTPSearcher.LOG_LATENCY_FINISH + *
+ * Total time of the HTTP traffic, but also decoding of the data, is this + * happens at the same time. + *
+ * HTTPSearcher.LOG_URI + *
+ * The complete URI used for external service. + *
+ * HTTPSearcher.LOG_SCHEME + *
+ * The scheme of the request URI sent. + *
+ * HTTPSearcher.LOG_HOST + *
+ * The host used for the request URI sent. + *
+ * HTTPSearcher.LOG_PORT + *
+ * The port used for the request URI sent. + *
+ * HTTPSearcher.LOG_PATH + *
+ * Path element of the request URI sent. + *
+ * HTTPSearcher.LOG_STATUS + *
+ * Status code of the HTTP response. + *
+ * HTTPSearcher.LOG_PROXY_TYPE + *
+ * The proxy type used, if any. Default is "http". + *
+ * HTTPSearcher.LOG_PROXY_HOST + *
+ * The proxy host, if any. + *
+ * HTTPSearcher.LOG_PROXY_PORT + *
+ * The proxy port, if any. + *
+ * HTTPSearcher.LOG_HEADER_PREFIX prepended to request header field name + *
+ * The content of any additional request header fields. + *
+ * HTTPSearcher.LOG_RESPONSE_HEADER_PREFIX prepended to response header field name + *
+ * The content of any additional response header fields. + *
+ + * @author Arne Bergene Fossaa + * @author bratseth + */ +public abstract class HTTPClientSearcher extends HTTPSearcher { + + static final CompoundName REQUEST_META_CARRIER = new CompoundName("com.yahoo.search.federation.http.HTTPClientSearcher_requestMeta"); + + protected final static Logger log = Logger.getLogger(HTTPClientSearcher.class.getName()); + + /** + * Creates a client searcher + * + * @param id the id of this instance + * @param connections the connections this will load balance and fail over between + * @param path the path portion of the url to be used + */ + public HTTPClientSearcher(ComponentId id, List connections,String path,Statistics statistics) { + super(id, connections, path, statistics); + } + + public HTTPClientSearcher(ComponentId id, List connections,String path,Statistics statistics, + CertificateStore certificateStore) { + super(id, connections, path, statistics, certificateStore); + } + + public HTTPClientSearcher(ComponentId id, List connections, HTTPParameters parameters, Statistics statistics) { + super(id, connections, parameters, statistics); + } + /** + * Creates a client searcher + * + * @param id the id of this instance + * @param connections the connections this will load balance and fail over between + * @param parameters the parameters to use when making http calls + * @param certificateStore the certificate store to use to pass certificates in requests + */ + public HTTPClientSearcher(ComponentId id, List connections, HTTPParameters parameters, + Statistics statistics, CertificateStore certificateStore) { + super(id, connections, parameters, statistics, certificateStore); + } + + /** Overridden to avoid interfering with errors from nested searchers, which is inappropriate for a client */ + @Override + public Result robustSearch(Query query, Execution execution, Connection connection) { + return search(query,execution,connection); + } + + /** Implements a search towards the connection chosen by the cluster searcher for this query */ + @Override + public Result search(Query query, Execution execution, Connection connection) { + Hit requestMeta = doHttpRequest(query, connection); + Result result = execution.search(query); + result.hits().add(requestMeta); + return result; + } + + private Hit doHttpRequest(Query query, Connection connection) { + URI uri; + // Create default meta hit for holding logging information + Hit requestMeta = createRequestMeta(); + query.properties().set(REQUEST_META_CARRIER, requestMeta); + + query.trace("Created request information hit",false,9); + try { + uri = getURI(query, connection); + } catch (MalformedURLException e) { + query.errors().add(createMalformedUrlError(query,e)); + return requestMeta; + } catch (URISyntaxException e) { + query.errors().add(createMalformedUrlError(query,e)); + return requestMeta; + } + + HttpEntity entity; + try { + if (query.getTraceLevel()>=1) + query.trace("Fetching " + uri.toString(), false, 1); + entity = getEntity(uri, requestMeta, query); + } catch (IOException e) { + query.errors().add(ErrorMessage.createBackendCommunicationError( + "Error when trying to connect to HTTP backend in " + this + " using " + connection + " for " + + query + ": " + Exceptions.toMessageString(e))); + return requestMeta; + } catch (TimeoutException e) { + query.errors().add(ErrorMessage.createTimeout("HTTP traffic timed out in " + + this + " for " + query + ": " + e.getMessage())); + return requestMeta; + } + if (entity==null) { + query.errors().add(ErrorMessage.createBackendCommunicationError( + "No result from connecting to HTTP backend in " + this + " using " + connection + " for " + query)); + return requestMeta; + } + + try { + query = handleResponse(entity,query); + } + catch (IOException e) { + query.errors().add(ErrorMessage.createBackendCommunicationError( + "Error when trying to consume input in " + this + ": " + Exceptions.toMessageString(e))); + } finally { + cleanupHttpEntity(entity); + } + return requestMeta; + } + + /** Overrides to pass the query on to the next searcher */ + @Override + public Result search(Query query, Execution execution, ErrorMessage error) { + query.errors().add(error); + return execution.search(query); + } + + /** Do nothing on fill in client searchers */ + @Override + public void fill(Result result,String summaryClass,Execution execution,Connection connection) { + } + + /** + * Convenience hook for unmarshalling the response and adding the information to the query. + * Implement this or handleResponse(entity,query) in any subclass. + * This default implementation throws an exception. + * + * @param inputStream the stream containing the data from the http service + * @param contentLength the length of the content in the stream in bytes, or a negative number if not known + * @param query the current query, to which information from the stream should be added + * @return query the query to propagate down the chain. This should almost always be the + * query instance given as a parameter. + */ + public Query handleResponse(InputStream inputStream, long contentLength, Query query) throws IOException { + throw new UnsupportedOperationException("handleResponse must be implemented by " + this); + } + + /** + * Unmarshals the response and adds the resulting data to the given query. + * This default implementation calls + * return handleResponse(entity.getContent(), entity.getContentLength(), query); + * (and does some detailed query tracing). + * + * @param query the current query, to which information from the stream should be added + * @return query the query to propagate down the chain. This should almost always be the + * query instance given as a parameter. + */ + public Query handleResponse(HttpEntity entity, Query query) throws IOException { + long len = entity.getContentLength(); + if (query.getTraceLevel()>=4) + query.trace("Received " + len + " bytes response in " + this, false, 4); + query = handleResponse(entity.getContent(), len, query); + if (query.getTraceLevel()>=2) + query.trace("Handled " + len + " bytes response in " + this, false, 2); + return query; + } + + /** Never retry individual queries to clients for now */ + @Override + protected boolean shouldRetry(Query query,Result result) { return false; } + + /** + * numHits and offset should not be part of the cache key as cache supports + * partial read/write that is only one cache entry is maintained per query + * irrespective of the offset and numhits. + */ + public abstract Map getCacheKey(Query q); + + /** + * Adds all key-values starting by "service." + getClientName() in query.properties(). + * Returns the empty map if {@link #getServiceName} is not overridden. + */ + @Override + public Map getQueryMap(Query query) { + LinkedHashMap queryMap=new LinkedHashMap<>(); + if (getServiceName().isEmpty()) return queryMap; + + for (Map.Entry objectProperty : query.properties().listProperties("service." + getServiceName()).entrySet()) // TODO: Make more efficient using CompoundName + queryMap.put(objectProperty.getKey(),objectProperty.getValue().toString()); + return queryMap; + } + + /** + * Override this to return the name of the service this is a client of. + * This is used to look up service specific properties as service.getServiceName.serviceSpecificProperty. + * This default implementation returns "", which means service specific parameters will not be used. + */ + protected String getServiceName() { return ""; } + +} diff --git a/container-search/src/main/java/com/yahoo/search/federation/http/HTTPParameters.java b/container-search/src/main/java/com/yahoo/search/federation/http/HTTPParameters.java new file mode 100644 index 00000000000..19fe1df3e2e --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/federation/http/HTTPParameters.java @@ -0,0 +1,315 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.federation.http; + +import com.google.common.base.Preconditions; +import com.yahoo.search.federation.ProviderConfig.PingOption; +import org.apache.http.conn.params.ConnManagerParams; +import org.apache.http.conn.params.ConnPerRouteBean; +import org.apache.http.params.BasicHttpParams; +import org.apache.http.params.HttpConnectionParams; +import org.apache.http.params.HttpParams; + +import com.yahoo.search.federation.ProviderConfig; + +/** + * A set of parameters for talking to an http backend + * + * @author bratseth + */ +public final class HTTPParameters { + + public static final String RETRIES = "com.yahoo.search.federation.http.retries"; + + private boolean frozen=false; + + // All timing parameters below are in milliseconds + /** The url request path portion */ + private String path="/"; + private int connectionTimeout=2000; + private int readTimeout=5000; + private boolean persistentConnections=true; + private boolean enableProxy = false; + private String proxyHost = "localhost"; + private int proxyPort = 1080; + private String method = "GET"; + private String schema = "http"; + private String inputEncoding = "utf-8"; + private String outputEncoding = "utf-8"; + private int maxTotalConnections=10000; + private int maxConnectionsPerRoute=10000; + private int socketBufferSizeBytes=-1; + private int retries = 1; + private int configuredReadTimeout = -1; + private int configuredConnectionTimeout = -1; + private int connectionPoolTimeout = -1; + private String ycaProxy = null; + private int ycaPort = 0; + private String ycaApplicationId = null; + private boolean ycaUseProxy = false; + private long ycaTtl = 0L; + private long ycaRetry = 0L; + + private PingOption.Enum pingOption = PingOption.NORMAL; + + + private boolean followRedirects = true; + + public HTTPParameters() {} + + public HTTPParameters(String path) { + setPath(path); + } + + public HTTPParameters(ProviderConfig providerConfig) { + configuredReadTimeout = (int) (providerConfig.readTimeout() * 1000.0d); + configuredConnectionTimeout = (int) (providerConfig.connectionTimeout() * 1000.0d); + connectionPoolTimeout = (int) (providerConfig.connectionPoolTimeout() * 1000.0d); + retries = providerConfig.retries(); + setPath(providerConfig.path()); + ycaUseProxy = providerConfig.yca().useProxy(); + if (ycaUseProxy) { + ycaProxy = providerConfig.yca().host(); + ycaPort = providerConfig.yca().port(); + } + ycaApplicationId = providerConfig.yca().applicationId(); + ycaTtl = providerConfig.yca().ttl() * 1000L; + ycaRetry = providerConfig.yca().retry() * 1000L; + followRedirects = providerConfig.followRedirects(); + pingOption = providerConfig.pingOption(); + } + + /** + * Set the url path to use in queries to this. If the argument is null or empty the path is set to "/". + * If a leading "/" is missing, it is added automatically. + */ + public final void setPath(String path) { + if (path==null || path.isEmpty()) path="/"; + + if (! path.startsWith("/")) + path="/" + path; + this.path = path; + } + + public PingOption.Enum getPingOption() { + return pingOption; + } + + public void setPingOption(PingOption.Enum pingOption) { + Preconditions.checkNotNull(pingOption); + ensureNotFrozen(); + this.pingOption = pingOption; + } + + /** Returns the url path. Default is "/". */ + public String getPath() { return path; } + + public boolean getFollowRedirects() { + return followRedirects; + } + + public void setFollowRedirects(boolean followRedirects) { + ensureNotFrozen(); + this.followRedirects = followRedirects; + } + + + public void setConnectionTimeout(int connectionTimeout) { + ensureNotFrozen(); + this.connectionTimeout=connectionTimeout; + } + + /** Returns the connection timeout in milliseconds. Default is 2000. */ + public int getConnectionTimeout() { return connectionTimeout; } + + public void setReadTimeout(int readTimeout) { + ensureNotFrozen(); + this.readTimeout=readTimeout; + } + + /** Returns the read timeout in milliseconds. Default is 5000. */ + public int getReadTimeout() { return readTimeout; } + + /** + * Note: This is currently largely a noop: Connections are reused even when this is set to true. + * The setting will change from sharing connections between threads to only reusing it within a thread + * but it is still reused. + */ + public void setPersistentConnections(boolean persistentConnections) { + ensureNotFrozen(); + this.persistentConnections=persistentConnections; + } + + /** Returns whether this should use persistent connections. Default is true. */ + public boolean getPersistentConnections() { return persistentConnections; } + + /** Returns whether proxying should be enabled. Default is false. */ + public boolean getEnableProxy() { return enableProxy; } + + public void setEnableProxy(boolean enableProxy ) { + ensureNotFrozen(); + this.enableProxy=enableProxy; + } + + /** Returns the proxy type to use (if enabled). Default is "http". */ + public String getProxyType() { + return "http"; + } + + public void setProxyHost(String proxyHost) { + ensureNotFrozen(); + this.proxyHost=proxyHost; + } + + /** Returns the proxy host to use (if enabled). Default is "localhost". */ + public String getProxyHost() { return proxyHost; } + + public void setProxyPort(int proxyPort) { + ensureNotFrozen(); + this.proxyPort=proxyPort; + } + + /** Returns the proxy port to use (if enabled). Default is 1080. */ + public int getProxyPort() { return proxyPort; } + + public void setMethod(String method) { + ensureNotFrozen(); + this.method=method; + } + + /** Returns the http method to use. Default is "GET". */ + public String getMethod() { return method; } + + public void setSchema(String schema) { + ensureNotFrozen(); + this.schema=schema; + } + + /** Returns the schema to use. Default is "http". */ + public String getSchema() { return schema; } + + public void setInputEncoding(String inputEncoding) { + ensureNotFrozen(); + this.inputEncoding=inputEncoding; + } + + /** Returns the input encoding. Default is "utf-8". */ + public String getInputEncoding() { return inputEncoding; } + + public void setOutputEncoding(String outputEncoding) { + ensureNotFrozen(); + this.outputEncoding=outputEncoding; + } + + /** Returns the output encoding. Default is "utf-8". */ + public String getOutputEncoding() { return outputEncoding; } + + /** Make this unmodifiable. Note that any thread synchronization must be done outside this object. */ + public void freeze() { + frozen=true; + } + + private void ensureNotFrozen() { + if (frozen) throw new IllegalStateException("Cannot modify frozen " + this); + } + + /** + * Returns the eligible subset of this as a HttpParams snapshot + * AND configures the Apache HTTP library with the parameters of this + */ + public HttpParams toHttpParams() { + return toHttpParams(connectionTimeout, readTimeout); + } + + /** + * Returns the eligible subset of this as a HttpParams snapshot + * AND configures the Apache HTTP library with the parameters of this + */ + public HttpParams toHttpParams(int connectionTimeout, int readTimeout) { + HttpParams params = new BasicHttpParams(); + // force use of configured value if available + if (configuredConnectionTimeout > 0) { + HttpConnectionParams.setConnectionTimeout(params, configuredConnectionTimeout); + } else { + HttpConnectionParams.setConnectionTimeout(params, connectionTimeout); + } + if (configuredReadTimeout > 0) { + HttpConnectionParams.setSoTimeout(params, configuredReadTimeout); + } else { + HttpConnectionParams.setSoTimeout(params, readTimeout); + } + if (socketBufferSizeBytes > 0) { + HttpConnectionParams.setSocketBufferSize(params, socketBufferSizeBytes); + } + if (connectionPoolTimeout > 0) { + ConnManagerParams.setTimeout(params, connectionPoolTimeout); + } + ConnManagerParams.setMaxTotalConnections(params, maxTotalConnections); + ConnManagerParams.setMaxConnectionsPerRoute(params, new ConnPerRouteBean(maxConnectionsPerRoute)); + if (retries >= 0) { + params.setIntParameter(RETRIES, retries); + } + params.setParameter("http.protocol.handle-redirects", followRedirects); + return params; + } + + public int getMaxTotalConnections() { + return maxTotalConnections; + } + + public void setMaxTotalConnections(int maxTotalConnections) { + ensureNotFrozen(); + this.maxTotalConnections = maxTotalConnections; + } + + public int getMaxConnectionsPerRoute() { + return maxConnectionsPerRoute; + } + + public void setMaxConnectionsPerRoute(int maxConnectionsPerRoute) { + ensureNotFrozen(); + this.maxConnectionsPerRoute = maxConnectionsPerRoute; + } + + public int getSocketBufferSizeBytes() { + return socketBufferSizeBytes; + } + + public void setSocketBufferSizeBytes(int socketBufferSizeBytes) { + ensureNotFrozen(); + this.socketBufferSizeBytes = socketBufferSizeBytes; + } + + public int getRetries() { + return retries; + } + + public void setRetries(int retries) { + ensureNotFrozen(); + this.retries = retries; + } + + public String getYcaProxy() { + return ycaProxy; + } + + public int getYcaPort() { + return ycaPort; + } + + public String getYcaApplicationId() { + return ycaApplicationId; + } + + public boolean getYcaUseProxy() { + return ycaUseProxy; + } + + public long getYcaTtl() { + return ycaTtl; + } + + public long getYcaRetry() { + return ycaRetry; + } + +} diff --git a/container-search/src/main/java/com/yahoo/search/federation/http/HTTPProviderSearcher.java b/container-search/src/main/java/com/yahoo/search/federation/http/HTTPProviderSearcher.java new file mode 100644 index 00000000000..c2bc6b2196b --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/federation/http/HTTPProviderSearcher.java @@ -0,0 +1,260 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.federation.http; + +import com.google.common.collect.ImmutableList; +import com.yahoo.component.ComponentId; +import com.yahoo.jdisc.http.CertificateStore; +import com.yahoo.search.cache.QrBinaryCacheConfig; +import com.yahoo.search.cache.QrBinaryCacheRegionConfig; +import com.yahoo.yolean.Exceptions; +import com.yahoo.search.Query; +import com.yahoo.search.Result; +import com.yahoo.search.federation.FederationSearcher; +import com.yahoo.search.query.Properties; +import com.yahoo.search.result.ErrorMessage; +import com.yahoo.search.result.Hit; +import com.yahoo.search.searchchain.Execution; +import com.yahoo.statistics.Counter; +import com.yahoo.statistics.Statistics; +import com.yahoo.statistics.Value; + +import org.apache.http.HttpEntity; + +import java.io.IOException; +import java.io.InputStream; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.*; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Superclass of searchers which talks to HTTP backends. Implement a subclass to talk to a backend + * over HTTP which is not supported by the platform out of the box. + *

+ * Implementations must override one of the unmarshal methods to unmarshal the response. + *

+ * + * @author Arne Bergene Fossaa + * @author bratseth + */ +public abstract class HTTPProviderSearcher extends HTTPSearcher { + + private final Counter emptyResults; + private final Value hitsPerQuery; + private final Value responseLatency; + private final Counter readTimeouts; + + private final static List excludedSourceProperties = ImmutableList.of("offset", "hits", "provider"); + + protected final static Logger log = Logger.getLogger(HTTPProviderSearcher.class.getName()); + + /** The name of the cache used (which is just getid().stringValue(), or null if no cache is used */ + protected String cacheName=null; + + public HTTPProviderSearcher(ComponentId id, List connections,String path, Statistics statistics) { + this(id,connections,new HTTPParameters(path), statistics); + } + + /** Creates a http provider searcher using id.getName as provider name */ + public HTTPProviderSearcher(ComponentId id, List connections, String path, + Statistics statistics, CertificateStore certificateStore) { + this(id, connections, new HTTPParameters(path), statistics, certificateStore); + } + + public HTTPProviderSearcher(ComponentId id, List connections, HTTPParameters parameters, + Statistics statistics) { + this(id, connections, parameters, statistics, new ThrowingCertificateStore()); + } + + /** + * Creates a provider searcher + * + * @param id the id of this instance + * @param connections the connections this will load balance and fail over between + * @param parameters the parameters to use when making http calls + */ + public HTTPProviderSearcher(ComponentId id, List connections, HTTPParameters parameters, + Statistics statistics, CertificateStore certificateStore) { + super(id, connections, parameters, statistics, certificateStore); + String suffix = "_" + getId().getName().replace('.', '_'); + hitsPerQuery = new Value("hits_per_query" + suffix, statistics, + new Value.Parameters().setLogRaw(false).setNameExtension(false).setLogMean(true)); + responseLatency = new Value(LOG_LATENCY_START + suffix, statistics, + new Value.Parameters().setLogRaw(false).setLogMean(true).setNameExtension(false)); + emptyResults = new Counter("empty_results" + suffix, statistics, false); + readTimeouts = new Counter(LOG_READ_TIMEOUT_PREFIX + suffix, statistics, false); + } + + /** @deprecated this method does nothing */ + @Deprecated + protected void configureCache(final QrBinaryCacheConfig cacheConfig,final QrBinaryCacheRegionConfig regionConfig) { + } + + /** + * Unmarshal the stream by converting it to hits and adding the hits to the given result. + * A convenience hook called by the default unmarshal(entity,result). + * Override this in subclasses which does not override unmarshal(entity,result). + *

+ * This default implementation throws an exception. + * + * @param stream the stream of data returned + * @param contentLength the length of the content in bytes if known, or a negative number if unknown + * @param result the result to which unmarshalled data should be added + */ + public void unmarshal(final InputStream stream, long contentLength, final Result result) throws IOException { + throw new UnsupportedOperationException("Unmarshal must be implemented by " + this); + } + + /** + * Unmarshal the result from an http entity. This default implementation calls + * unmarshal(entity.getContent(), entity.getContentLength(), result) + * (and does some detailed query tracing). + * + * @param entity the entity containing the data to unmarshal + * @param result the result to which unmarshalled data should be added + */ + public void unmarshal(HttpEntity entity,Result result) throws IOException { + Query query=result.getQuery(); + long len = entity.getContentLength(); + if (query.getTraceLevel()>=4) + query.trace("Received " + len + " bytes response in " + this, false, 4); + query.trace("Unmarshaling result.", false, 6); + unmarshal(entity.getContent(), len, result); + + if (query.getTraceLevel()>=2) + query.trace("Handled " + len + " bytes response in " + this, false, 2); + + } + + protected void addNonExcludedSourceProperties(Query query, Map queryMap) { + Properties sourceProperties = FederationSearcher.getSourceProperties(query); + if (sourceProperties != null) { + for(Map.Entry entry : sourceProperties.listProperties("").entrySet()) { + if (!excludedSourceProperties.contains(entry.getKey())) { + queryMap.put(entry.getKey(), entry.getValue().toString()); + } + } + } + } + + /** + * Hook called at the moment the result is returned from this searcher. This default implementation + * does return result. + * + * @param result the result which is to be returned + * @param requestMeta the request information hit, or null if none was created (e.g if this was a cache lookup) + * @param e the exception caused during execution of this query, or null if none + * @return the result which is returned upwards + */ + protected Result inspectAndReturnFinalResult(Result result, Hit requestMeta, Exception e) { + return result; + } + + private Result statisticsBeforeInspection(Result result, + Hit requestMeta, Exception e) { + int hitCount = result.getConcreteHitCount(); + if (hitCount == 0) { + emptyResults.increment(); + } + hitsPerQuery.put((double) hitCount); + + if (requestMeta != null) { + requestMeta.setField(LOG_HITCOUNT, Integer.valueOf(hitCount)); + } + + return inspectAndReturnFinalResult(result, + requestMeta, e); + } + + + @Override + protected void logResponseLatency(long latency) { + responseLatency.put((double) latency); + } + + @Override + public Result search(Query query, Execution execution,Connection connection) { + // Create default meta hit for holding logging information + Hit requestMeta = createRequestMeta(); + Result result = new Result(query); + result.hits().add(requestMeta); + query.trace("Created request information hit", false, 9); + + try { + URI uri = getURI(query, requestMeta, connection); + if (query.getTraceLevel()>=1) + query.trace("Fetching " + uri.toString(), false, 1); + long requestStartTime = System.currentTimeMillis(); + + HttpEntity entity = getEntity(uri, requestMeta, query); + + // Why should consumeEntity call inspectAndReturnFinalResult itself? + // Seems confusing to me. + return entity == null + ? statisticsBeforeInspection(result, requestMeta, null) + : consumeEntity(entity, query, result, requestMeta, requestStartTime); + + } catch (MalformedURLException|URISyntaxException e) { + result.hits().addError(createMalformedUrlError(query,e)); + return statisticsBeforeInspection(result, requestMeta, e); + } catch (TimeoutException e) { + result.hits().addError(ErrorMessage.createTimeout("No time left for HTTP traffic in " + + this + + " for " + query + ": " + e.getMessage())); + return statisticsBeforeInspection(result, requestMeta, e); + } catch (IOException e) { + result.hits().addError(ErrorMessage.createBackendCommunicationError( + "Error when trying to connect to HTTP backend in " + this + + " for " + query + ": " + Exceptions.toMessageString(e))); + return statisticsBeforeInspection(result, requestMeta, e); + } + } + + private Result consumeEntity(HttpEntity entity, Query query, Result result, Hit logHit, long requestStartTime) { + + try { + // remove some time from timeout to allow for close calls with return result + unmarshal(new TimedHttpEntity(entity, query.getStartTime(), Math.max(1, query.getTimeout() - 10)), result); + logHit.setField(LOG_LATENCY_FINISH, System.currentTimeMillis() - requestStartTime); + return statisticsBeforeInspection(result, logHit, null); + } catch (IOException e) { + result.hits().addError(ErrorMessage.createBackendCommunicationError( + "Error when trying to consume input in " + this + ": " + Exceptions.toMessageString(e))); + return statisticsBeforeInspection(result, logHit, e); + } catch (TimeoutException e) { + readTimeouts.increment(); + result.hits().addError(ErrorMessage + .createTimeout("Timed out while reading/unmarshaling from backend in " + + this + " for " + query + + ": " + e.getMessage())); + return statisticsBeforeInspection(result, logHit, e); + } finally { // TODO: The scope of this finally must be enlarged to release the connection also on errors + cleanupHttpEntity(entity); + } + } + + /** + * Returns the key-value pairs that should be added as properties to the request url sent to the service. + * Must be overridden in subclasses to add the key-values expected by the service in question, unless + * {@link #getURI} (from which this is called) is overridden. + *

+ * This default implementation returns the query.properties() prefixed by + * "source.[sourceName]" or "property.[propertyName]" + * (by calling {@link #addNonExcludedSourceProperties}). + */ + @Override + public Map getQueryMap(Query query) { + Map queryMap = super.getQueryMap(query); + addNonExcludedSourceProperties(query, queryMap); + return queryMap; + } + + /** + * @deprecated the cache key is ignored as there is no built-in caching support + */ + @Deprecated + public abstract Map getCacheKey(Query q); + +} diff --git a/container-search/src/main/java/com/yahoo/search/federation/http/HTTPSearcher.java b/container-search/src/main/java/com/yahoo/search/federation/http/HTTPSearcher.java new file mode 100644 index 00000000000..65ce7b3647c --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/federation/http/HTTPSearcher.java @@ -0,0 +1,958 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.federation.http; + +import com.google.inject.Inject; +import com.yahoo.component.ComponentId; +import com.yahoo.jdisc.http.CertificateStore; +import com.yahoo.log.LogLevel; +import com.yahoo.prelude.Ping; +import com.yahoo.prelude.Pong; +import com.yahoo.yolean.Exceptions; +import com.yahoo.search.Query; +import com.yahoo.search.cluster.ClusterSearcher; +import com.yahoo.search.federation.ProviderConfig.PingOption; +import com.yahoo.search.result.ErrorMessage; +import com.yahoo.search.result.Hit; +import com.yahoo.statistics.Counter; +import com.yahoo.statistics.Statistics; +import com.yahoo.text.Utf8; + +import org.apache.http.*; +import org.apache.http.client.HttpClient; +import org.apache.http.client.HttpRequestRetryHandler; +import org.apache.http.client.methods.HttpRequestBase; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.conn.ClientConnectionManager; +import org.apache.http.conn.ConnectTimeoutException; +import org.apache.http.conn.params.ConnManagerParams; +import org.apache.http.conn.params.ConnRoutePNames; +import org.apache.http.conn.routing.HttpRoutePlanner; +import org.apache.http.conn.scheme.PlainSocketFactory; +import org.apache.http.conn.scheme.Scheme; +import org.apache.http.conn.scheme.SchemeRegistry; +import org.apache.http.conn.ssl.SSLSocketFactory; +import org.apache.http.impl.client.DefaultHttpClient; +import org.apache.http.impl.conn.DefaultHttpRoutePlanner; +import org.apache.http.impl.conn.SingleClientConnManager; +import org.apache.http.impl.conn.tsccm.ThreadSafeClientConnManager; +import org.apache.http.params.HttpParams; +import org.apache.http.params.HttpProtocolParams; +import org.apache.http.protocol.BasicHttpContext; +import org.apache.http.protocol.ExecutionContext; +import org.apache.http.protocol.HttpContext; +import org.apache.http.protocol.HttpRequestExecutor; +import org.apache.http.util.EntityUtils; + +import javax.net.ssl.SSLHandshakeException; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.io.UnsupportedEncodingException; +import java.net.*; +import java.nio.charset.Charset; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Generic superclass of searchers making connections to some HTTP service. This + * supports clustered connections - a list of alternative servers may be given, + * requests will be hashed across these and failed over in case some are down. + *

+ * This simply provides some utility methods for working with http connections + * and implements ping against the service. + * + *

This searcher contains code from the Apache httpcomponents client library, + * licensed to the Apache Software Foundation under the Apache License, Version + * 2.0. Please refer to http://www.apache.org/licenses/LICENSE-2.0 for details. + * + *

This class automatically adds a meta hit containing latency and other + * meta information about the obtained HTTP data using createRequestMeta(). + * The fields available in the hit are:

+ * + *
+ * HTTPSearcher.LOG_LATENCY_START + *
+ * The latency of the external provider answering a request. + *
+ * HTTPSearcher.LOG_LATENCY_FINISH + *
+ * Total time of the HTTP traffic, but also decoding of the data, as this + * happens at the same time. + *
+ * HTTPSearcher.LOG_HITCOUNT + *
+ * Number of concrete hits in the result returned by this provider. + *
+ * HTTPSearcher.LOG_URI + *
+ * The complete URI used for external service. + *
+ * HTTPSearcher.LOG_SCHEME + *
+ * The scheme of the request URI sent. + *
+ * HTTPSearcher.LOG_HOST + *
+ * The host used for the request URI sent. + *
+ * HTTPSearcher.LOG_PORT + *
+ * The port used for the request URI sent. + *
+ * HTTPSearcher.LOG_PATH + *
+ * Path element of the request URI sent. + *
+ * HTTPSearcher.LOG_STATUS + *
+ * Status code of the HTTP response. + *
+ * HTTPSearcher.LOG_PROXY_TYPE + *
+ * The proxy type used, if any. Default is "http". + *
+ * HTTPSearcher.LOG_PROXY_HOST + *
+ * The proxy host, if any. + *
+ * HTTPSearcher.LOG_PROXY_PORT + *
+ * The proxy port, if any. + *
+ * HTTPSearcher.LOG_HEADER_PREFIX prepended to request header field name + *
+ * The content of any additional request header fields. + *
+ * HTTPSearcher.LOG_RESPONSE_HEADER_PREFIX prepended to response header field name + *
+ * The content of any additional response header fields. + *
+ * + * @author Arne Bergene Fossaa + */ +public abstract class HTTPSearcher extends ClusterSearcher { + + protected static final String YCA_HTTP_HEADER = "Yahoo-App-Auth"; + + private static final Charset iso8859Charset = Charset.forName("ISO-8859-1"); + + // Logging field name constants + public static final String LOG_PATH = "path"; + public static final String LOG_PORT = "port"; + public static final String LOG_HOST = "host"; + public static final String LOG_IP_ADDRESS = "ip_address"; + public static final String IP_ADDRESS_UNKNOWN = "unknown"; + + public static final String LOG_SCHEME = "scheme"; + public static final String LOG_URI = "uri"; + public static final String LOG_PROXY_PORT = "proxy_port"; + public static final String LOG_PROXY_HOST = "proxy_host"; + public static final String LOG_PROXY_TYPE = "proxy_type"; + public static final String LOG_STATUS = "status"; + public static final String LOG_LATENCY_FINISH = "latency_finish"; + public static final String LOG_LATENCY_START = "latency_start"; + public static final String LOG_LATENCY_CONNECT = "latency_connect"; + public static final String LOG_QUERY_PARAM_PREFIX = "query_param_"; + public static final String LOG_HEADER_PREFIX = "header_"; + public static final String LOG_RESPONSE_HEADER_PREFIX = "response_header_"; + public static final String LOG_HITCOUNT = "hit_count"; + public static final String LOG_CONNECT_TIMEOUT_PREFIX = "connect_timeout_"; + public static final String LOG_READ_TIMEOUT_PREFIX = "read_timeout_"; + + protected final Logger log = Logger.getLogger(HTTPSearcher.class.getName()); + + /** The HTTP parameters to use. Assigned in the constructor */ + private HTTPParameters httpParameters; + + private final Counter connectTimeouts; + + /** Whether to use certificates */ + protected boolean useCertificate = false; + + private final CertificateStore certificateStore; + + /** The (optional) YCA application ID. */ + private String ycaApplicationId = null; + + /** The (optional) YCA proxy */ + protected HttpHost ycaProxy = null; + + /** YCA cache TTL in ms */ + private long ycaTtl = 0L; + + /** YCA retry rate in the cache if no cert is found, in ms */ + private long ycaRetry = 0L; + + /** Set at construction if this is using persistent connections */ + private ClientConnectionManager sharedConnectionManager = null; + + /** Set at construction if using non-persistent connections */ + private ThreadLocal singleClientConnManagerThreadLocal = null; + + private static final SchemeRegistry schemeRegistry = new SchemeRegistry(); + + static { + schemeRegistry.register(new Scheme("http", PlainSocketFactory + .getSocketFactory(), 80)); + schemeRegistry.register(new Scheme("https", SSLSocketFactory + .getSocketFactory(), 443)); + } + + public HTTPSearcher(ComponentId componentId, List connections,String path, Statistics statistics) { + this(componentId, connections, new HTTPParameters(path), statistics, new ThrowingCertificateStore()); + } + + /** Creates a http searcher with default connection and read timeouts (currently 2 and 5s respectively) */ + public HTTPSearcher(ComponentId componentId, List connections,String path, Statistics statistics, + CertificateStore certificateStore) { + this(componentId, connections, new HTTPParameters(path), statistics, certificateStore); + } + + public HTTPSearcher(ComponentId componentId, List connections, HTTPParameters parameters, + Statistics statistics) { + this(componentId, connections, parameters, statistics, new ThrowingCertificateStore()); + } + /** + * Creates a http searcher + * + * @param componentId the id of this instance + * @param connections the connections to establish to the backend nodes + * @param parameters the http parameters to use. This object will be frozen if it isn't already + */ + @Inject + public HTTPSearcher(ComponentId componentId, List connections, HTTPParameters parameters, + Statistics statistics, CertificateStore certificateStore) { + super(componentId,connections,false); + String suffix = "_" + getId().getName().replace('.', '_'); + + connectTimeouts = new Counter(LOG_CONNECT_TIMEOUT_PREFIX + suffix, statistics, false); + + parameters.freeze(); + this.httpParameters = parameters; + this.certificateStore = certificateStore; + + if (parameters.getPersistentConnections()) { + HttpParams params=parameters.toHttpParams(); + HttpProtocolParams.setVersion(params, HttpVersion.HTTP_1_1); + ConnManagerParams.setTimeout(params, 10); + sharedConnectionManager = new ThreadSafeClientConnManager(params, schemeRegistry); + Thread connectionPurgerThread = new Thread(() -> { + //this is the default value in yahoo jvm installations + long DNSTTLSec = 120; + while (true) { + try { + Thread.sleep(DNSTTLSec * 1000); + if (sharedConnectionManager == null) + continue; + + sharedConnectionManager.closeExpiredConnections(); + DNSTTLSec = Long.valueOf(java.security.Security + .getProperty("networkaddress.cache.ttl")); + //No DNS TTL, no need to close idle connections + if (DNSTTLSec <= 0) { + DNSTTLSec = 120; + continue; + } + sharedConnectionManager.closeIdleConnections(2 * DNSTTLSec, TimeUnit.SECONDS); + } catch (InterruptedException e) { + return; + } catch (NumberFormatException e) { + continue; + } + } + }); + connectionPurgerThread.setDaemon(true); + connectionPurgerThread.start(); + + } + else { + singleClientConnManagerThreadLocal =new ThreadLocal<>(); + } + + initializeYCA(httpParameters, certificateStore); + } + + /** + * Initialize YCA certificate and proxy if they have been set to non-null, + * non-empty values. It will wrap thrown exceptions from the YCA layer into + * RuntimeException and propagate them. + */ + private void initializeYCA(HTTPParameters parameters, CertificateStore certificateStore) { + String applicationId = parameters.getYcaApplicationId(); + String proxy = parameters.getYcaProxy(); + int port = parameters.getYcaPort(); + long ttl = parameters.getYcaTtl(); + long retry = parameters.getYcaRetry(); + + if (applicationId != null && !applicationId.trim().isEmpty()) { + initializeCertificate(applicationId, ttl, retry, certificateStore); + } + + if (parameters.getYcaUseProxy()) { + initializeProxy(proxy, port); + } + } + + /** Returns the HTTP parameters used in this. This is always frozen */ + public HTTPParameters getParameters() { return httpParameters; } + + /** + * Returns the key-value pairs that should be added as properties to the request url sent to the service. + * Must be overridden in subclasses to add the key-values expected by the service in question, unless + * {@link #getURI} (from which this is called) is overridden. + *

+ * This default implementation returns an empty LinkedHashMap. + */ + public Map getQueryMap(Query query) { + return new LinkedHashMap<>(); + } + + /** + * Initialize the YCA certificate. + * This will warn but not throw if certificates could not be loaded, as the certificates + * are external state which can fail independently. + */ + private void initializeCertificate(String applicationId, long ttl, long retry, CertificateStore certificateStore) { + try { + // get the certificate, i.e. init the cache and check integrity + String certificate = certificateStore.getCertificate(applicationId, ttl, retry); + if (certificate == null) { + getLogger().log(LogLevel.WARNING, "No certificate found for application '" + applicationId + "'"); + return; + } + + this.useCertificate = true; + this.ycaApplicationId = applicationId; + this.ycaTtl = ttl; + this.ycaRetry = retry; + getLogger().log(LogLevel.CONFIG, "Got certificate: " + certificate); + } + catch (Exception e) { + getLogger().log(LogLevel.WARNING,"Exception while initializing certificate for application '" + + applicationId + "' in " + this, e); + } + } + + /** + * Initialize the YCA proxy setting. + */ + private void initializeProxy(String host, int port) { + ycaProxy = new HttpHost(host, port); + getLogger().log(LogLevel.CONFIG,"Proxy is configured; will use proxy: " + ycaProxy); + } + + /** + * Same a {@code getURI(query, offset, hits, null)}. + * @see #getURI(Query, Hit, Connection) + */ + protected URI getURI(Query query,Connection connection) throws MalformedURLException, URISyntaxException { + Hit requestMeta; + try { + requestMeta = (Hit) query.properties().get(HTTPClientSearcher.REQUEST_META_CARRIER); + } catch (ClassCastException e) { + requestMeta = null; + } + return getURI(query, requestMeta, connection); + } + + /** + * Creates the URI for a query. + * Populates the {@code requestMeta} meta hit with the created URI HTTP properties. + * + * @param requestMeta a meta hit that holds logging information about this request (may be {@code null}). + */ + protected URI getURI(Query query, Hit requestMeta, Connection connection) + throws MalformedURLException, URISyntaxException { + StringBuilder parameters = new StringBuilder(); + + Map queries = getQueryMap(query); + if (queries.size() > 0) { + Iterator> mapIterator = queries.entrySet().iterator(); + parameters.append("?"); + try { + Map.Entry entry; + while (mapIterator.hasNext()) { + entry = mapIterator.next(); + + if (requestMeta != null) + requestMeta.setField(LOG_QUERY_PARAM_PREFIX + + entry.getKey(), entry.getValue()); + + parameters.append(entry.getKey() + "=" + URLEncoder.encode(entry.getValue(), + httpParameters.getInputEncoding())); + if (mapIterator.hasNext()) { + parameters.append("&"); + } + } + } catch (UnsupportedEncodingException e) { + throw new RuntimeException("Unknown input encoding set in " + this, e); + } + } + + URI uri = new URL(httpParameters.getSchema(), connection.getHost(), + connection.getPort(), getPath() + parameters.toString()).toURI(); + if (requestMeta != null) { + requestMeta.setField(LOG_URI, uri.toString()); + requestMeta.setField(LOG_SCHEME, uri.getScheme()); + requestMeta.setField(LOG_HOST, uri.getHost()); + requestMeta.setField(LOG_PORT, uri.getPort()); + requestMeta.setField(LOG_PATH, uri.getPath()); + } + return uri; + } + + /** + * Called by getURI() to get the path of the URI for the external service. + * The default implementation returns httpParameters.getPath(); subclasses + * which only wants to override the path from httpParameters may use this + * method instead of overriding all of getURI(). + * + * @return the path to use for getURI + */ + protected String getPath() { + return httpParameters.getPath(); + } + + /** + * The URI that is used to check if the provider is up or down. This will again be used in the + * checkPing method by checking that we get a response that has a good status code (below 300). If better + * validation than just status code checking is needed, override the checkPing method. + */ + protected URI getPingURI(Connection connection) throws MalformedURLException, URISyntaxException { + return new URL(httpParameters.getSchema(),connection.getHost(),connection.getPort(),getPingPath()).toURI(); + } + + /** + * Called by getPingURI() to get the path of the URI for pinging the + * external service. The default implementation returns + * httpParameters.getPath(); subclasses which only wants to override the + * path from httpParameters may use this method instead of overriding all of + * getPingURI(). + * + * @return the path to use for getPingURI + */ + protected String getPingPath() { + return httpParameters.getPath(); + } + + /** + * Checks if the response is valid. + * @param response The response from the ping request + * @param pong The pong result to return back to the calling method. This method + * will add an error to the pong result (using addError) if the status of the HTTP response is 300 or above. + */ + protected void checkPing(HttpResponse response, Pong pong) { + if (response.getStatusLine().getStatusCode() >= 300) { + pong.addError(com.yahoo.search.result.ErrorMessage.createBackendCommunicationError( + "Got error " + response.getStatusLine().getStatusCode() + + " when contacting backend") + ); + } + } + + /** + * Pinging in HTTPBackend is done by creating a PING uri from http://host:port/path. + * If this returns a status that is below 300, the ping is considered good. + * + * If another uri is needed for pinging, reimplement getPingURI. + * + * Override either this method to change how ping + */ + @Override + public Pong ping(Ping ping, Connection connection) { + URI uri = null; + Pong pong = new Pong(); + HttpResponse response = null; + + if (httpParameters.getPingOption() == PingOption.DISABLE) + return pong; + + try { + uri = getPingURI(connection); + if (uri == null) + pong.addError(ErrorMessage.createIllegalQuery("Ping uri is null")); + if (uri.getHost()==null) { + pong.addError(ErrorMessage.createIllegalQuery("Ping uri has no host")); + uri=null; + } + } catch (MalformedURLException | URISyntaxException e) { + pong.addError(ErrorMessage.createIllegalQuery("Malformed ping uri '" + uri + "': " + + Exceptions.toMessageString(e))); + } catch (RuntimeException e) { + log.log(Level.WARNING,"Unexpected exception while attempting to ping " + connection + " using uri '" + uri + "'",e); + pong.addError(ErrorMessage.createIllegalQuery("Unexpected problem with ping uri '" + uri + "': " + + Exceptions.toMessageString(e))); + } + + if (uri == null) return pong; + pong.setPingInfo("using uri '" + uri + "'"); + + try { + response = getPingResponse(uri, ping); + checkPing(response, pong); + } catch (IOException e) { + //We do not have a valid ping + pong.addError(ErrorMessage.createBackendCommunicationError( + "Exception thrown when pinging with url '" + uri + "': " + Exceptions.toMessageString(e))); + } catch (TimeoutException e) { + pong.addError(ErrorMessage.createTimeout("Timeout for ping " + + uri + " in " + this + ": " + e.getMessage())); + } catch (RuntimeException e) { + log.log(Level.WARNING,"Unexpected exception while attempting to ping " + connection + " using uri '" + uri + "'",e); + pong.addError(ErrorMessage.createIllegalQuery("Unexpected problem with ping uri '" + uri + "': " + + Exceptions.toMessageString(e))); + } finally { + if (response != null) { + cleanupHttpEntity(response.getEntity()); + } + } + + return pong; + } + + private HttpResponse getPingResponse(URI uri, Ping ping) throws IOException { + long timeLeft = ping.getTimeout(); + int connectionTimeout = (int) (timeLeft / 4L); + int readTimeout = (int) (timeLeft * 3L / 4L); + + Map requestHeaders = null; + if (httpParameters.getPingOption() == PingOption.YCA) + requestHeaders = generateYCAHeaders(); + + return getResponse(uri, null, requestHeaders, null, connectionTimeout, readTimeout); + } + + /** + * Same a {@code getEntity(uri, null)}. + * @param uri resource to fetch + * @param query the originating query + * @throws TimeoutException If query.timeLeft() equal to or lower than 0 + */ + protected HttpEntity getEntity(URI uri, Query query) throws IOException{ + return getEntity(uri, null, query); + } + + + /** + * Gets the HTTP entity that holds the response contents. + * @param uri the request URI. + * @param requestMeta a meta hit that holds logging information about this request (may be {@code null}). + * @param query the originating query + * @return the http entity, or null if none + * @throws java.io.IOException Whenever HTTP status code is in the 300 or higher range. + * @throws TimeoutException If query.timeLeft() equal to or lower than 0 + */ + protected HttpEntity getEntity(URI uri, Hit requestMeta, Query query) throws IOException { + if (query.getTimeLeft() <= 0) { + throw new TimeoutException("No time left for querying external backend."); + } + HttpResponse response = getResponse(uri, requestMeta, query); + StatusLine statusLine = response.getStatusLine(); + + // Logging + if (requestMeta != null) { + requestMeta.setField(LOG_STATUS, statusLine.getStatusCode()); + for (HeaderIterator headers = response.headerIterator(); headers.hasNext(); ) { + Header h = headers.nextHeader(); + requestMeta.setField(LOG_RESPONSE_HEADER_PREFIX + h.getName(), h.getValue()); + } + } + + if (statusLine.getStatusCode() >= 300) { + HttpEntity entity = response.getEntity(); + String message = createServerReporterErrorMessage(statusLine, entity); + cleanupHttpEntity(response.getEntity()); + throw new IOException(message); + } + + return response.getEntity(); + } + + private String createServerReporterErrorMessage(StatusLine statusLine, HttpEntity entity) { + String message = "Error when trying to connect to HTTP backend: " + + statusLine.getStatusCode() + " : " + statusLine.getReasonPhrase(); + + try { + if (entity != null) { + message += "(Message = " + EntityUtils.toString(entity) + ")"; + } + } catch (Exception e) { + log.log(LogLevel.WARNING, "Could not get message.", e); + } + + return message; + } + + /** + * Creates a meta hit dedicated to holding logging information. This hit has + * the 'logging:[searcher's ID]' type. + */ + protected Hit createRequestMeta() { + Hit requestMeta = new Hit("logging:" + getId().toString()); + requestMeta.setMeta(true); + requestMeta.types().add("logging"); + return requestMeta; + } + + protected void cleanupHttpEntity(HttpEntity entity) { + if (entity == null) return; + + try { + entity.consumeContent(); + } catch (IOException e) { + // It is ok if do not consume it, the resource will be freed after + // timeout. + // But log it just in case. + log.log(LogLevel.getVespaLogLevel(LogLevel.DEBUG), + "Not able to consume after processing: " + Exceptions.toMessageString(e)); + } + } + + /** + * Same as {@code getResponse(uri, null)}. + */ + protected HttpResponse getResponse(URI uri, Query query) throws IOException{ + return getResponse(uri, null, query); + } + + /** + * Executes an HTTP request and gets the response. + * @param uri the request URI. + * @param requestMeta a meta hit that holds logging information about this request (may be {@code null}). + * @param query the originating query, used to calculate timeouts + */ + protected HttpResponse getResponse(URI uri, Hit requestMeta, Query query) throws IOException { + long timeLeft = query.getTimeLeft(); + int connectionTimeout = (int) (timeLeft / 4L); + int readTimeout = (int) (timeLeft * 3L / 4L); + connectionTimeout = connectionTimeout <= 0 ? 1 : connectionTimeout; + readTimeout = readTimeout <= 0 ? 1 : readTimeout; + HttpEntity reqEntity = getRequestEntity(query, requestMeta); + Map reqHeaders = getRequestHeaders(query, requestMeta); + if ((reqEntity == null) && (reqHeaders == null)) { + return getResponse(uri, requestMeta, connectionTimeout, readTimeout); + } else { + return getResponse(uri, reqEntity, reqHeaders, requestMeta, connectionTimeout, readTimeout); + } + } + + /** + * Returns the set of headers to be passed in the http request to provider backend. The default + * implementation returns null, unless YCA is in use. If YCA is used, it will return a map + * only containing the needed YCA headers. + */ + protected Map getRequestHeaders(Query query, Hit requestMeta) { + if (useCertificate) { + return generateYCAHeaders(); + } + return null; + } + + /** + * Returns the HTTP request entity to use when making the request for this query. + * This default implementation returns null. + * + *

Do return a repeatable entity if HTTP retry is active. + * + * @return the http request entity to use, or null to use the default entity + */ + protected HttpEntity getRequestEntity(Query query, Hit requestMeta) { + return null; + } + + /** + * Executes an HTTP request and gets the response. + * @param uri the request URI. + * @param requestMeta a meta hit that holds logging information about this request (may be {@code null}). + * @param connectionTimeout how long to wait for getting a connection + * @param readTimeout timeout for reading HTTP data + */ + protected HttpResponse getResponse(URI uri, Hit requestMeta, int connectionTimeout, int readTimeout) + throws IOException { + return getResponse(uri, null, null, requestMeta, connectionTimeout, readTimeout); + } + + + /** + * Executes an HTTP request and gets the response. + * @param uri the request URI. + * @param requestMeta a meta hit that holds logging information about this request (may be {@code null}). + * @param connectionTimeout how long to wait for getting a connection + * @param readTimeout timeout for reading HTTP data + */ + protected HttpResponse getResponse(URI uri, HttpEntity reqEntity, + Map reqHeaders, Hit requestMeta, + int connectionTimeout, int readTimeout) throws IOException { + + HttpParams httpParams = httpParameters.toHttpParams(connectionTimeout, readTimeout); + HttpClient httpClient = createClient(httpParams); + long start = 0L; + HttpUriRequest request; + if (httpParameters.getEnableProxy() && "http".equals(httpParameters.getProxyType())) { + HttpHost proxy = new HttpHost(httpParameters.getProxyHost(), + httpParameters.getProxyPort(), httpParameters.getProxyType()); + httpClient.getParams().setParameter(ConnRoutePNames.DEFAULT_PROXY, proxy); + // Logging + if (requestMeta != null) { + requestMeta.setField(LOG_PROXY_TYPE, httpParameters.getProxyType()); + requestMeta.setField(LOG_PROXY_HOST, httpParameters.getProxyHost()); + requestMeta.setField(LOG_PROXY_PORT, httpParameters.getProxyPort()); + } + } + if (reqEntity == null) { + request = createRequest(httpParameters.getMethod(), uri); + } else { + request = createRequest(httpParameters.getMethod(), uri, reqEntity); + } + + if (reqHeaders != null) { + for (Entry entry : reqHeaders.entrySet()) { + if (entry.getValue() == null || isAscii(entry.getValue())) { + request.addHeader(entry.getKey(), entry.getValue()); + } else { + byte[] asBytes = Utf8.toBytes(entry.getValue()); + String asLyingString = new String(asBytes, 0, asBytes.length, iso8859Charset); + request.addHeader(entry.getKey(), asLyingString); + } + } + } + + // Logging + if (requestMeta != null) { + for (HeaderIterator headers = request.headerIterator(); headers.hasNext();) { + Header h = headers.nextHeader(); + requestMeta.setField(LOG_HEADER_PREFIX + h.getName(), h.getValue()); + } + start = System.currentTimeMillis(); + } + + HttpResponse response; + + try { + HttpContext context = new BasicHttpContext(); + response = httpClient.execute(request, context); + + if (requestMeta != null) { + requestMeta.setField(LOG_IP_ADDRESS, getIpAddress(context)); + } + } catch (ConnectTimeoutException e) { + connectTimeouts.increment(); + throw e; + } + + // Logging + long latencyStart = System.currentTimeMillis() - start; + if (requestMeta != null) { + requestMeta.setField(LOG_LATENCY_START, latencyStart); + } + logResponseLatency(latencyStart); + return response; + } + + private String getIpAddress(HttpContext context) { + HttpConnection connection = (HttpConnection) context.getAttribute(ExecutionContext.HTTP_CONNECTION); + if (connection instanceof HttpInetConnection) { + InetAddress address = ((HttpInetConnection) connection).getRemoteAddress(); + String hostAddress = address.getHostAddress(); + return hostAddress == null ? + IP_ADDRESS_UNKNOWN: + hostAddress; + } else { + getLogger().log(LogLevel.DEBUG, "Unexpected connection type: " + connection.getClass().getName()); + return IP_ADDRESS_UNKNOWN; + } + } + + private boolean isAscii(String value) { + char[] scanBuffer = new char[value.length()]; + value.getChars(0, value.length(), scanBuffer, 0); + for (char c: scanBuffer) + if (c > 127) return false; + return true; + } + + protected void logResponseLatency(long latency) { } + + /** + * Creates a http client for one request. Override to customize the client + * to use, e.g for testing. This default implementation will add the YCA + * proxy to params if is necessary, and then do + * return new SearcherHttpClient(getConnectionManager(params), params); + */ + protected HttpClient createClient(HttpParams params) { + if (ycaProxy != null) { + params.setParameter(ConnRoutePNames.DEFAULT_PROXY, ycaProxy); + } + return new SearcherHttpClient(getConnectionManager(params), params); + } + + /** + * Creates a HttpRequest. Override to customize the request. + * This default implementation does return new HttpRequest(method,uri); + */ + protected HttpUriRequest createRequest(String method,URI uri) { + return createRequest(method, uri, null); + } + + /** + * Creates a HttpRequest. Override to customize the request. + * This default implementation does return new HttpRequest(method,uri); + */ + protected HttpUriRequest createRequest(String method,URI uri, HttpEntity entity) { + return new SearcherHttpRequest(method,uri); + } + + /** Get a connection manager which may be used safely from this thread */ + protected ClientConnectionManager getConnectionManager(HttpParams params) { + if (sharedConnectionManager != null) {// We are using shared connections + return sharedConnectionManager; + } else { + SingleClientConnManager singleClientConnManager = singleClientConnManagerThreadLocal.get(); + if (singleClientConnManager == null) { + singleClientConnManager = new SingleClientConnManager(params, schemeRegistry); + singleClientConnManagerThreadLocal.set(singleClientConnManager); + } + return singleClientConnManager; + } + } + + /** Utility method for creating error messages when a url is incorrect */ + protected ErrorMessage createMalformedUrlError(Query query,Exception e) { + return ErrorMessage.createErrorInPluginSearcher("Malformed url in " + this + " for " + query + + ": " + Exceptions.toMessageString(e)); + } + + private Map generateYCAHeaders() { + Map headers = new HashMap<>(); + String certificate = certificateStore.getCertificate(ycaApplicationId, ycaTtl, ycaRetry); + headers.put(YCA_HTTP_HEADER, certificate); + return headers; + } + + protected static class SearcherHttpClient extends DefaultHttpClient { + + private final int retries; + + public SearcherHttpClient(final ClientConnectionManager conman, final HttpParams params) { + super(conman, params); + retries = params.getIntParameter(HTTPParameters.RETRIES, 1); + addRequestInterceptor((request, context) -> { + if (!request.containsHeader("Accept-Encoding")) { + request.addHeader("Accept-Encoding", "gzip"); + } + }); + addResponseInterceptor((response, context) -> { + HttpEntity entity = response.getEntity(); + if (entity == null) return; + Header ceheader = entity.getContentEncoding(); + if (ceheader == null) return; + for (HeaderElement codec : ceheader.getElements()) { + if (codec.getName().equalsIgnoreCase("gzip")) { + response.setEntity(new GzipDecompressingEntity(response.getEntity())); + return; + } + } + }); + } + + @Override + protected HttpRequestExecutor createRequestExecutor() { + return new HttpRequestExecutor(); + } + + @Override + protected HttpRoutePlanner createHttpRoutePlanner() { + return new DefaultHttpRoutePlanner(getConnectionManager().getSchemeRegistry()); + } + + @Override + protected HttpRequestRetryHandler createHttpRequestRetryHandler() { + return new SearcherHttpRequestRetryHandler(retries); + } + } + + /** A retry handler which avoids retrying forever on errors misclassified as transient */ + private static class SearcherHttpRequestRetryHandler implements HttpRequestRetryHandler { + private final int retries; + + public SearcherHttpRequestRetryHandler(int retries) { + this.retries = retries; + } + + @Override + public boolean retryRequest(IOException e, int executionCount, HttpContext httpContext) { + if (e == null) { + throw new IllegalArgumentException("Exception parameter may not be null"); + } + if (executionCount > retries) { + return false; + } + if (e instanceof NoHttpResponseException) { + // Retry if the server dropped connection on us + return true; + } + if (e instanceof InterruptedIOException) { + // Timeout from federation layer + return false; + } + if (e instanceof UnknownHostException) { + // Unknown host + return false; + } + if (e instanceof SSLHandshakeException) { + // SSL handshake exception + return false; + } + return true; + } + + + } + + private static class SearcherHttpRequest extends HttpRequestBase { + String method; + + public SearcherHttpRequest(String method, final URI uri) { + super(); + this.method = method; + setURI(uri); + } + + @Override + public String getMethod() { + return method; + } + } + + /** + * Only for testing. + */ + public void shutdownConnectionManagers() { + ClientConnectionManager manager; + if (sharedConnectionManager != null) { + manager = sharedConnectionManager; + } else { + manager = singleClientConnManagerThreadLocal.get(); + } + if (manager != null) { + manager.shutdown(); + } + } + + protected static final class ThrowingCertificateStore implements CertificateStore { + + @Override + public String getCertificate(String key, long ttl, long retry) { + throw new UnsupportedOperationException("A certificate store is not available"); + } + + } + +} + diff --git a/container-search/src/main/java/com/yahoo/search/federation/http/TimedHttpEntity.java b/container-search/src/main/java/com/yahoo/search/federation/http/TimedHttpEntity.java new file mode 100644 index 00000000000..9d89a318c32 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/federation/http/TimedHttpEntity.java @@ -0,0 +1,88 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.federation.http; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.http.Header; +import org.apache.http.HttpEntity; + +/** + * Wrapper for adding timeout to an HttpEntity instance. + * + * @author Steinar Knutsen + */ +public class TimedHttpEntity implements HttpEntity { + /** + * The wrapped entity. Never null. + */ + private final HttpEntity entity; + private final long startTime; + private final long timeout; + + public TimedHttpEntity(HttpEntity entity, long startTime, long timeout) { + if (entity == null) { + throw new IllegalArgumentException("TimedHttpEntity cannot be instantiated with null HttpEntity."); + } + this.entity = entity; + this.startTime = startTime; + this.timeout = timeout; + } + + + @Override + public InputStream getContent() throws IOException, IllegalStateException { + InputStream content = entity.getContent(); + if (content == null) { + return null; + } else { + return new TimedStream(content, startTime, timeout); + } + } + + + // START OF PURE FORWARDING METHODS + @Override + public void consumeContent() throws IOException { + entity.consumeContent(); + } + + + @Override + public Header getContentEncoding() { + return entity.getContentEncoding(); + } + + @Override + public long getContentLength() { + return entity.getContentLength(); + } + + @Override + public Header getContentType() { + return entity.getContentType(); + } + + @Override + public boolean isChunked() { + return entity.isChunked(); + } + + @Override + public boolean isRepeatable() { + return entity.isRepeatable(); + } + + @Override + public boolean isStreaming() { + return entity.isStreaming(); + } + + @Override + public void writeTo(OutputStream outstream) throws IOException { + entity.writeTo(outstream); + } + // END OF PURE FORWARDING METHODS + +} diff --git a/container-search/src/main/java/com/yahoo/search/federation/http/TimedStream.java b/container-search/src/main/java/com/yahoo/search/federation/http/TimedStream.java new file mode 100644 index 00000000000..02777afb43c --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/federation/http/TimedStream.java @@ -0,0 +1,111 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.federation.http; + +import java.io.IOException; +import java.io.InputStream; + +/** + * A stream which throws a TimeoutException if query timeout has been reached. + * + * @author Steinar Knutsen + */ +public class TimedStream extends InputStream { + + /** + * A time barrier value, the point in time from which on read operations will cause an exception. + */ + private final long limit; + + /** + * A wrapped InputStream instance. + */ + private final InputStream content; + + /** + * Wrap an InputStream to make read operations potentially fire off + * TimeoutException. + * + *

Typical use would be
+ * new TimedStream(httpEntity.getContent(), query.getStartTime(), query.getTimeout()) + * + * @param content + * the InputStream to wrap + * @param startTime + * start time of query + * @param timeout + * how long the query is allowed to run + */ + public TimedStream(InputStream content, long startTime, long timeout) { + if (content == null) { + throw new IllegalArgumentException("Cannot instantiate TimedStream with null InputStream"); + } + this.content = content; + // The reasion for doing it in here instead of outside the constructor + // is this makes the usage of the class more intuitive IMHO + this.limit = startTime + timeout; + } + + private void checkTime(String message) { + if (System.currentTimeMillis() >= limit) { + throw new TimeoutException(message); + } + } + + // START FORWARDING METHODS: + // All methods below are forwarding methods to the contained stream, where + // some do a timeout check. + @Override + public int read() throws IOException { + int data = content.read(); + checkTime("Timed out during read()."); + return data; + } + + @Override + public int available() throws IOException { + return content.available(); + } + + @Override + public void close() throws IOException { + content.close(); + } + + @Override + public synchronized void mark(int readlimit) { + content.mark(readlimit); + } + + @Override + public boolean markSupported() { + return content.markSupported(); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + int length = content.read(b, off, len); + checkTime("Timed out during read(byte[], int, int)"); + return length; + } + + @Override + public int read(byte[] b) throws IOException { + int length = content.read(b); + checkTime("Timed out during read(byte[])"); + return length; + } + + @Override + public synchronized void reset() throws IOException { + content.reset(); + } + + @Override + public long skip(long n) throws IOException { + long skipped = content.skip(n); + checkTime("Timed out during skip(long)"); + return skipped; + } + // END FORWARDING METHODS + +} diff --git a/container-search/src/main/java/com/yahoo/search/federation/http/TimeoutException.java b/container-search/src/main/java/com/yahoo/search/federation/http/TimeoutException.java new file mode 100644 index 00000000000..9e0536ea053 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/federation/http/TimeoutException.java @@ -0,0 +1,20 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.federation.http; + +/** + * Timeout marker for slow HTTP connections. + * + * @author Steinar Knutsen + */ +public class TimeoutException extends RuntimeException { + + /** + * Auto-generated version ID. + */ + private static final long serialVersionUID = 7084147598258586559L; + + public TimeoutException(String message) { + super(message); + } + +} diff --git a/container-search/src/main/java/com/yahoo/search/federation/http/package-info.java b/container-search/src/main/java/com/yahoo/search/federation/http/package-info.java new file mode 100644 index 00000000000..aa3d249ab66 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/federation/http/package-info.java @@ -0,0 +1,7 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +@ExportPackage +@PublicApi +package com.yahoo.search.federation.http; + +import com.yahoo.api.annotations.PublicApi; +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/container-search/src/main/java/com/yahoo/search/federation/package-info.java b/container-search/src/main/java/com/yahoo/search/federation/package-info.java new file mode 100644 index 00000000000..008e339db4b --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/federation/package-info.java @@ -0,0 +1,17 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * The federation layer on top of the search container. This contains + * + *

    + *
  • A model of Sources which can be selected in and for a Query and which are implemented + * by a Search Chain, and Providers which represents the connection to specific backends (these + * two are often 1-1 but not always) + *
  • The federation searcher responsible for forking a query to multiple sources in parallel + *
  • A simple searcher which can talk to other vespa services + *
+ */ +@ExportPackage +package com.yahoo.search.federation; + +import com.yahoo.api.annotations.PublicApi; +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/container-search/src/main/java/com/yahoo/search/federation/selection/FederationTarget.java b/container-search/src/main/java/com/yahoo/search/federation/selection/FederationTarget.java new file mode 100644 index 00000000000..676292d6a3a --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/federation/selection/FederationTarget.java @@ -0,0 +1,68 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.federation.selection; + +import java.util.Optional; +import com.yahoo.component.chain.Chain; +import com.yahoo.search.Searcher; +import com.yahoo.search.searchchain.model.federation.FederationOptions; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * Represents a search chain that the federation searcher should send a query to, + * along with a timeout and + * custom data reserved for use by the TargetSelector. + * + * @author tonytv + */ +public final class FederationTarget { + private final Chain chain; + private final FederationOptions federationOptions; + private final T customData; + + public FederationTarget(Chain chain, FederationOptions federationOptions, T customData) { + checkNotNull(chain); + checkNotNull(federationOptions); + + this.chain = chain; + this.federationOptions = federationOptions; + this.customData = customData; + } + + public Chain getChain() { + return chain; + } + + public FederationOptions getFederationOptions() { + return federationOptions; + } + + /** + * Any data that the TargetSelector wants to associate with this target. + * Owned exclusively by the TargetSelector that created this instance. + */ + public T getCustomData() { + return customData; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + FederationTarget that = (FederationTarget) o; + + if (!chain.equals(that.chain)) return false; + if (customData != null ? !customData.equals(that.customData) : that.customData != null) return false; + if (!federationOptions.equals(that.federationOptions)) return false; + + return true; + } + + @Override + public int hashCode() { + int result = chain.hashCode(); + result = 31 * result + federationOptions.hashCode(); + return result; + } +} diff --git a/container-search/src/main/java/com/yahoo/search/federation/selection/TargetSelector.java b/container-search/src/main/java/com/yahoo/search/federation/selection/TargetSelector.java new file mode 100644 index 00000000000..0f6bf2d5b71 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/federation/selection/TargetSelector.java @@ -0,0 +1,35 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.federation.selection; + +import com.yahoo.processing.execution.chain.ChainRegistry; +import com.yahoo.search.Query; +import com.yahoo.search.Result; +import com.yahoo.search.Searcher; +import com.yahoo.search.federation.selection.FederationTarget; + +import java.util.Collection; + +/** + * Allows adding extra targets that the federation searcher should federate to. + * + * For each federation search call, the federation searcher will call targetSelector.getTargets. + * + * Then, for each target, it will: + * 1) call modifyTargetQuery(target, query) + * 2) call modifyTargetResult(target, result) + * + * @author tonytv + */ +public interface TargetSelector { + Collection> getTargets(Query query, ChainRegistry searcherChainRegistry); + + /** + * For modifying the query before sending it to a the target + */ + void modifyTargetQuery(FederationTarget target, Query query); + + /** + * For modifying the result produced by the target. + */ + void modifyTargetResult(FederationTarget target, Result result); +} diff --git a/container-search/src/main/java/com/yahoo/search/federation/selection/package-info.java b/container-search/src/main/java/com/yahoo/search/federation/selection/package-info.java new file mode 100644 index 00000000000..f3c289f6b43 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/federation/selection/package-info.java @@ -0,0 +1,7 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +@ExportPackage +@PublicApi +package com.yahoo.search.federation.selection; + +import com.yahoo.api.annotations.PublicApi; +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/container-search/src/main/java/com/yahoo/search/federation/sourceref/SearchChainInvocationSpec.java b/container-search/src/main/java/com/yahoo/search/federation/sourceref/SearchChainInvocationSpec.java new file mode 100644 index 00000000000..7e82801d85f --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/federation/sourceref/SearchChainInvocationSpec.java @@ -0,0 +1,37 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.federation.sourceref; + +import com.yahoo.component.ComponentId; +import com.yahoo.search.searchchain.model.federation.FederationOptions; + +import java.util.List; + +/** + * Specifices which search chain should be run and how it should be run. + * + * @author tonytv + */ +public class SearchChainInvocationSpec implements Cloneable { + public final ComponentId searchChainId; + + public final ComponentId source; + public final ComponentId provider; + + public final FederationOptions federationOptions; + public final List documentTypes; + + SearchChainInvocationSpec(ComponentId searchChainId, + ComponentId source, ComponentId provider, FederationOptions federationOptions, + List documentTypes) { + this.searchChainId = searchChainId; + this.source = source; + this.provider = provider; + this.federationOptions = federationOptions; + this.documentTypes = documentTypes; + } + + @Override + public SearchChainInvocationSpec clone() throws CloneNotSupportedException { + return (SearchChainInvocationSpec)super.clone(); + } +} diff --git a/container-search/src/main/java/com/yahoo/search/federation/sourceref/SearchChainResolver.java b/container-search/src/main/java/com/yahoo/search/federation/sourceref/SearchChainResolver.java new file mode 100644 index 00000000000..fc70fb5e5e7 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/federation/sourceref/SearchChainResolver.java @@ -0,0 +1,160 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.federation.sourceref; + +import com.yahoo.component.ComponentId; +import com.yahoo.component.ComponentSpecification; +import com.yahoo.component.provider.ComponentRegistry; +import com.yahoo.processing.request.Properties; +import com.yahoo.search.searchchain.model.federation.FederationOptions; + +import java.util.Collections; +import java.util.List; +import java.util.SortedSet; +import java.util.TreeSet; + +/** + * Resolves (source, provider) component specifications to a search chain invocation spec. + * The provider component specification is given by the entry in the queryMap with key + * 'source.<source-name>.provider'. + * + *

+ * The diagram shows the relationship between source, provider and the result: + * (source is used to select row, provider is used to select column.) + * Provider id = null is used for regular search chains. + *

+ * + *
+ *                   Provider id
+ *                 null
+ *                |----+---+---+---|
+ *                | o  |   |   |   |
+ *                |----+---+---+---|
+ * Source id      |    | o | o |   |
+ *                |----+---+---+---|
+ *                |    |   |   | o |
+ *                |----+---+---+---|
+ *
+ *                    o: SearchChainInvocationSpec
+ * 
+ * + * @author tonytv + */ +public class SearchChainResolver { + private final ComponentRegistry targets; + private final SortedSet defaultTargets; + + public static class Builder { + + private SortedSet defaultTargets = new TreeSet<>(); + + private final ComponentRegistry targets = new ComponentRegistry() { + @Override + public void freeze() { + for (Target target : allComponents()) { + target.freeze(); + } + super.freeze(); + } + }; + + public Builder addSearchChain(ComponentId searchChainId) { + return addSearchChain(searchChainId, Collections.emptyList()); + } + + public Builder addSearchChain(ComponentId searchChainId, FederationOptions federationOptions) { + return addSearchChain(searchChainId, federationOptions, Collections.emptyList()); + } + + public Builder addSearchChain(ComponentId searchChainId, List documentTypes) { + return addSearchChain(searchChainId, new FederationOptions(), documentTypes); + } + + public Builder addSearchChain(ComponentId searchChainId, FederationOptions federationOptions, + List documentTypes) { + registerTarget(new SingleTarget(searchChainId, + new SearchChainInvocationSpec(searchChainId, null, null, federationOptions, documentTypes), false)); + return this; + } + + private Builder registerTarget(SingleTarget singleTarget) { + targets.register(singleTarget.getId(), singleTarget); + if (singleTarget.useByDefault()) { + defaultTargets.add(singleTarget); + } + return this; + } + + public Builder addSourceForProvider(ComponentId sourceId, ComponentId providerId, ComponentId searchChainId, + boolean isDefaultProviderForSource, FederationOptions federationOptions, + List documentTypes) { + + SearchChainInvocationSpec searchChainInvocationSpec = + new SearchChainInvocationSpec(searchChainId, sourceId, providerId, federationOptions, documentTypes); + + SourcesTarget sourcesTarget = getOrRegisterSourceTarget(sourceId); + sourcesTarget.addSource(providerId, searchChainInvocationSpec, isDefaultProviderForSource); + + registerTarget(new SingleTarget(searchChainId, searchChainInvocationSpec, true)); + return this; + } + + private SourcesTarget getOrRegisterSourceTarget(ComponentId sourceId) { + Target sourcesTarget = targets.getComponent(sourceId); + if (sourcesTarget == null) { + targets.register(sourceId, new SourcesTarget(sourceId)); + return getOrRegisterSourceTarget(sourceId); + } else if (sourcesTarget instanceof SourcesTarget) { + return (SourcesTarget) sourcesTarget; + } else { + throw new IllegalStateException("Expected " + sourceId + " to be a source."); + } + } + + public void useTargetByDefault(String targetId) { + Target target = targets.getComponent(targetId); + assert target != null : "Target not added yet."; + + defaultTargets.add(target); + } + + public SearchChainResolver build() { + targets.freeze(); + return new SearchChainResolver(targets, defaultTargets); + } + } + + private SearchChainResolver(ComponentRegistry targets, SortedSet defaultTargets) { + this.targets = targets; + this.defaultTargets = Collections.unmodifiableSortedSet(defaultTargets); + } + + + public SearchChainInvocationSpec resolve(ComponentSpecification sourceRef, Properties sourceToProviderMap) + throws UnresolvedSearchChainException { + + Target target = resolveTarget(sourceRef); + return target.responsibleSearchChain(sourceToProviderMap); + } + + private Target resolveTarget(ComponentSpecification sourceRef) throws UnresolvedSearchChainException { + Target target = targets.getComponent(sourceRef); + if (target == null) { + throw UnresolvedSourceRefException.createForMissingSourceRef(sourceRef); + } + return target; + } + + public SortedSet allTopLevelTargets() { + SortedSet topLevelTargets = new TreeSet<>(); + for (Target target : targets.allComponents()) { + if (!target.isDerived) { + topLevelTargets.add(target); + } + } + return topLevelTargets; + } + + public SortedSet defaultTargets() { + return defaultTargets; + } +} diff --git a/container-search/src/main/java/com/yahoo/search/federation/sourceref/SingleTarget.java b/container-search/src/main/java/com/yahoo/search/federation/sourceref/SingleTarget.java new file mode 100644 index 00000000000..4210b56a501 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/federation/sourceref/SingleTarget.java @@ -0,0 +1,36 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.federation.sourceref; + +import com.yahoo.component.ComponentId; +import com.yahoo.processing.request.Properties; + +/** + * TODO: What is this? + * +* @author tonytv +*/ +public class SingleTarget extends Target { + private final SearchChainInvocationSpec searchChainInvocationSpec; + + public SingleTarget(ComponentId id, SearchChainInvocationSpec searchChainInvocationSpec, boolean isDerived) { + super(id, isDerived); + this.searchChainInvocationSpec = searchChainInvocationSpec; + } + + @Override + public SearchChainInvocationSpec responsibleSearchChain(Properties queryProperties) { + return searchChainInvocationSpec; + } + + @Override + public String searchRefDescription() { + return localId.toString(); + } + + @Override + void freeze() {} + + public final boolean useByDefault() { + return searchChainInvocationSpec.federationOptions.getUseByDefault(); + } +} diff --git a/container-search/src/main/java/com/yahoo/search/federation/sourceref/SourceRefResolver.java b/container-search/src/main/java/com/yahoo/search/federation/sourceref/SourceRefResolver.java new file mode 100644 index 00000000000..8de6635e517 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/federation/sourceref/SourceRefResolver.java @@ -0,0 +1,71 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.federation.sourceref; + +import static com.yahoo.container.util.Util.quote; + +import java.util.Arrays; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; + +import com.yahoo.component.ComponentSpecification; +import com.yahoo.prelude.IndexFacts; +import com.yahoo.processing.request.Properties; + +/** + * Maps a source reference to search chain invocation specs. + * + * @author tonytv + */ +public class SourceRefResolver { + private final SearchChainResolver searchChainResolver; + + public SourceRefResolver(SearchChainResolver searchChainResolver) { + this.searchChainResolver = searchChainResolver; + } + public Set resolve(ComponentSpecification sourceRef, Properties sourceToProviderMap, + IndexFacts indexFacts) + throws UnresolvedSearchChainException { + + try { + return new LinkedHashSet<>(Arrays.asList(searchChainResolver.resolve(sourceRef, sourceToProviderMap))); + } catch (UnresolvedSourceRefException e) { + return resolveClustersWithDocument(sourceRef, sourceToProviderMap, indexFacts); + } + } + + private Set resolveClustersWithDocument(ComponentSpecification sourceRef, + Properties sourceToProviderMap, + IndexFacts indexFacts) + throws UnresolvedSearchChainException { + + if (hasOnlyName(sourceRef)) { + Set clusterSearchChains = new LinkedHashSet<>(); + + List clusters = indexFacts.clustersHavingSearchDefinition(sourceRef.getName()); + for (String cluster : clusters) { + clusterSearchChains.add(resolveClusterSearchChain(cluster, sourceRef, sourceToProviderMap)); + } + + if (!clusterSearchChains.isEmpty()) + return clusterSearchChains; + } + + throw UnresolvedSourceRefException.createForMissingSourceRef(sourceRef); + + } + + private SearchChainInvocationSpec resolveClusterSearchChain(String cluster, ComponentSpecification sourceRef, + Properties sourceToProviderMap) throws UnresolvedSearchChainException { + try { + return searchChainResolver.resolve(new ComponentSpecification(cluster), sourceToProviderMap); + } catch (UnresolvedSearchChainException e) { + throw new UnresolvedSearchChainException("Failed to resolve cluster search chain " + quote(cluster) + + " when using source ref " + quote(sourceRef) + " as a document name."); + } + } + + private boolean hasOnlyName(ComponentSpecification sourceSpec) { + return new ComponentSpecification(sourceSpec.getName()).equals(sourceSpec); + } +} diff --git a/container-search/src/main/java/com/yahoo/search/federation/sourceref/SourcesTarget.java b/container-search/src/main/java/com/yahoo/search/federation/sourceref/SourcesTarget.java new file mode 100644 index 00000000000..bb1de051ed0 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/federation/sourceref/SourcesTarget.java @@ -0,0 +1,112 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.federation.sourceref; + + +import com.google.common.base.Joiner; +import com.yahoo.component.ComponentId; +import com.yahoo.component.ComponentSpecification; +import com.yahoo.component.chain.model.ComponentAdaptor; +import com.yahoo.component.provider.ComponentRegistry; +import com.yahoo.processing.request.Properties; + +import java.util.ArrayList; +import java.util.List; +import java.util.SortedSet; +import java.util.TreeSet; + + +public class SourcesTarget extends Target { + private ComponentRegistry> providerSources = + new ComponentRegistry>() {}; + private SearchChainInvocationSpec defaultProviderSource; + + public SourcesTarget(ComponentId sourceId) { + super(sourceId); + } + + @Override + public SearchChainInvocationSpec responsibleSearchChain(Properties queryProperties) throws UnresolvedSearchChainException { + ComponentSpecification providerSpecification = providerSpecificationForSource(queryProperties); + if (providerSpecification == null) { + return defaultProviderSource; + } else { + return lookupProviderSource(providerSpecification); + } + } + + @Override + public String searchRefDescription() { + StringBuilder builder = new StringBuilder(sourceId().stringValue()); + builder.append("[provider = "). + append(Joiner.on(", ").join(allProviderIdsStringValue())). + append("]"); + return builder.toString(); + } + + private SortedSet allProviderIdsStringValue() { + SortedSet result = new TreeSet<>(); + for (ComponentAdaptor providerSource : providerSources.allComponents()) { + result.add(providerSource.getId().stringValue()); + } + return result; + } + + private SearchChainInvocationSpec lookupProviderSource(ComponentSpecification providerSpecification) + throws UnresolvedSearchChainException { + ComponentAdaptor providerSource = providerSources.getComponent(providerSpecification); + + if (providerSource == null) + throw UnresolvedProviderException.createForMissingProvider(sourceId(), providerSpecification); + + return providerSource.model; + } + + public void freeze() { + if (defaultProviderSource == null) + throw new RuntimeException("Null default provider source for source " + sourceId() + "."); + + providerSources.freeze(); + } + + public void addSource(ComponentId providerId, SearchChainInvocationSpec searchChainInvocationSpec, + boolean isDefaultProviderForSource) { + providerSources.register(providerId, new ComponentAdaptor<>(providerId, searchChainInvocationSpec)); + + if (isDefaultProviderForSource) { + setDefaultProviderSource(searchChainInvocationSpec); + } + } + + private void setDefaultProviderSource(SearchChainInvocationSpec searchChainInvocationSpec) { + if (defaultProviderSource != null) + throw new RuntimeException("Tried to set two default providers for source " + sourceId() + "."); + + defaultProviderSource = searchChainInvocationSpec; + } + + ComponentId sourceId() { + return localId; + } + + + /** + * Looks up source.(sourceId).provider in the query properties. + * @return null if the default provider should be used + */ + private ComponentSpecification providerSpecificationForSource(Properties queryProperties) { + String spec = queryProperties.getString("source." + sourceId().stringValue() + ".provider"); + return ComponentSpecification.fromString(spec); + } + + public SearchChainInvocationSpec defaultProviderSource() { + return defaultProviderSource; + } + + public List allProviderSources() { + List allProviderSources = new ArrayList<>(); + for (ComponentAdaptor component : providerSources.allComponents()) { + allProviderSources.add(component.model); + } + return allProviderSources; + } +} diff --git a/container-search/src/main/java/com/yahoo/search/federation/sourceref/Target.java b/container-search/src/main/java/com/yahoo/search/federation/sourceref/Target.java new file mode 100644 index 00000000000..4cf5d406959 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/federation/sourceref/Target.java @@ -0,0 +1,31 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.federation.sourceref; + +import com.yahoo.component.AbstractComponent; +import com.yahoo.component.ComponentId; +import com.yahoo.processing.request.Properties; + +/** + * TODO: What's this? + * +* @author tonytv +*/ +public abstract class Target extends AbstractComponent { + final ComponentId localId; + final boolean isDerived; + + Target(ComponentId localId, boolean derived) { + super(localId); + this.localId = localId; + isDerived = derived; + } + + Target(ComponentId localId) { + this(localId, false); + } + + public abstract SearchChainInvocationSpec responsibleSearchChain(Properties queryProperties) throws UnresolvedSearchChainException; + public abstract String searchRefDescription(); + + abstract void freeze(); +} diff --git a/container-search/src/main/java/com/yahoo/search/federation/sourceref/UnresolvedProviderException.java b/container-search/src/main/java/com/yahoo/search/federation/sourceref/UnresolvedProviderException.java new file mode 100644 index 00000000000..50b2dc95660 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/federation/sourceref/UnresolvedProviderException.java @@ -0,0 +1,22 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.federation.sourceref; + +import com.yahoo.component.ComponentId; +import com.yahoo.component.ComponentSpecification; + +import static com.yahoo.container.util.Util.quote; + +/** + * @author tonytv + */ +@SuppressWarnings("serial") +class UnresolvedProviderException extends UnresolvedSearchChainException { + UnresolvedProviderException(String msg) { + super(msg); + } + + static UnresolvedSearchChainException createForMissingProvider(ComponentId source, + ComponentSpecification provider) { + return new UnresolvedProviderException("No provider " + quote(provider) + " for source " + quote(source) + "."); + } +} diff --git a/container-search/src/main/java/com/yahoo/search/federation/sourceref/UnresolvedSearchChainException.java b/container-search/src/main/java/com/yahoo/search/federation/sourceref/UnresolvedSearchChainException.java new file mode 100644 index 00000000000..b8417a3d05a --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/federation/sourceref/UnresolvedSearchChainException.java @@ -0,0 +1,13 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.federation.sourceref; + +/** + * Thrown if a search chain can not be resolved from one or more ids. + * @author tonytv + */ +@SuppressWarnings("serial") +public class UnresolvedSearchChainException extends Exception { + public UnresolvedSearchChainException(String msg) { + super(msg); + } +} diff --git a/container-search/src/main/java/com/yahoo/search/federation/sourceref/UnresolvedSourceRefException.java b/container-search/src/main/java/com/yahoo/search/federation/sourceref/UnresolvedSourceRefException.java new file mode 100644 index 00000000000..4c15366914b --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/federation/sourceref/UnresolvedSourceRefException.java @@ -0,0 +1,21 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.federation.sourceref; + +import com.yahoo.component.ComponentSpecification; + +import static com.yahoo.container.util.Util.quote; + +/** + * @author tonytv + */ +@SuppressWarnings("serial") +class UnresolvedSourceRefException extends UnresolvedSearchChainException { + UnresolvedSourceRefException(String msg) { + super(msg); + } + + + static UnresolvedSearchChainException createForMissingSourceRef(ComponentSpecification source) { + return new UnresolvedSourceRefException("Could not resolve source ref " + quote(source) + "."); + } +} diff --git a/container-search/src/main/java/com/yahoo/search/federation/vespa/QueryMarshaller.java b/container-search/src/main/java/com/yahoo/search/federation/vespa/QueryMarshaller.java new file mode 100644 index 00000000000..554424c267f --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/federation/vespa/QueryMarshaller.java @@ -0,0 +1,170 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.federation.vespa; + +import java.util.Iterator; + +import com.yahoo.prelude.query.*; + +/** + * Marshal a query stack into an advanced query string suitable for + * passing to another QRS. + * + * @author Steinar Knutsen + * @author Rong-En Fan + */ +public class QueryMarshaller { + private boolean atRoot = true; + + public String marshal(Item root) { + if (root == null || root instanceof NullItem) { + return null; + } + StringBuilder s = new StringBuilder(); + marshal(root, s); + atRoot = true; + return s.toString(); + } + + /** + * We do not yet care about exact match indices + */ + private void marshal(Item root, StringBuilder s) { + switch (root.getItemType()) { + case OR: + marshalOr((OrItem) root, s); + break; + case AND: + marshalAnd((CompositeItem) root, s); + break; + case NOT: + marshalNot((NotItem) root, s); + break; + case RANK: + marshalRank((RankItem) root, s); + break; + case WORD: + case INT: + case PREFIX: + case SUBSTRING: + case SUFFIX: + marshalWord((TermItem) root, s); + break; + case PHRASE: + // PhraseItem and PhraseSegmentItem don't add quotes for segmented + // termse + if (root instanceof PhraseSegmentItem) { + marshalPhrase((PhraseSegmentItem) root, s); + } else { + marshalPhrase((PhraseItem) root, s); + } + break; + case NEAR: + marshalNear((NearItem) root, s); + break; + case ONEAR: + marshalNear((ONearItem) root, s); + break; + case WEAK_AND: + marshalWeakAnd((WeakAndItem)root, s); + default: + break; + } + } + + + private void marshalWord(TermItem item, StringBuilder s) { + String index = item.getIndexName(); + if (index.length() != 0) { + s.append(item.getIndexName()).append(':'); + } + s.append(item.stringValue()); + if (item.getWeight() != Item.DEFAULT_WEIGHT) + s.append("!").append(item.getWeight()); + } + + private void marshalRank(RankItem root, StringBuilder s) { + marshalComposite("RANK", root, s); + } + + private void marshalNot(NotItem root, StringBuilder s) { + marshalComposite("ANDNOT", root, s); + } + + private void marshalOr(OrItem root, StringBuilder s) { + marshalComposite("OR", root, s); + } + + /** + * Dump WORD items, and add space between each of them unless those + * words came from segmentation. + * + * @param root CompositeItem + * @param s current marshaled query + */ + private void dumpWords(CompositeItem root, StringBuilder s) { + for (Iterator i = root.getItemIterator(); i.hasNext();) { + Item word = i.next(); + boolean useSeparator = true; + if (word instanceof TermItem) { + s.append(((TermItem) word).stringValue()); + if (word instanceof WordItem) { + useSeparator = !((WordItem) word).isFromSegmented(); + } + } else { + dumpWords((CompositeItem) word, s); + } + if (useSeparator && i.hasNext()) { + s.append(' '); + } + } + } + + private void marshalPhrase(PhraseItem root, StringBuilder s) { + marshalPhrase(root, s, root.isExplicit(), false); + } + + private void marshalPhrase(PhraseSegmentItem root, StringBuilder s) { + marshalPhrase(root, s, root.isExplicit(), true); + } + + private void marshalPhrase(IndexedItem root, StringBuilder s, boolean isExplicit, boolean isSegmented) { + String index = root.getIndexName(); + if (index.length() != 0) { + s.append(root.getIndexName()).append(':'); + } + if (isExplicit || !isSegmented) s.append('"'); + dumpWords((CompositeItem) root, s); + if (isExplicit || !isSegmented) s.append('"'); + } + + private void marshalNear(NearItem root, StringBuilder s) { + marshalComposite(root.getName() + "(" + root.getDistance() + ")", root, s); + } + + // Not only AndItem returns ItemType.AND + private void marshalAnd(CompositeItem root, StringBuilder s) { + marshalComposite("AND", root, s); + } + + private void marshalWeakAnd(WeakAndItem root, StringBuilder s) { + marshalComposite("WAND(" + root.getN() + ")", root, s); + } + + private void marshalComposite(String operator, CompositeItem root, StringBuilder s) { + boolean useParen = !atRoot; + if (useParen) { + s.append("( "); + } else { + atRoot = false; + } + for (Iterator i = root.getItemIterator(); i.hasNext();) { + Item item = i.next(); + marshal(item, s); + if (i.hasNext()) + s.append(' ').append(operator).append(' '); + } + if (useParen) { + s.append(" )"); + } + } +} diff --git a/container-search/src/main/java/com/yahoo/search/federation/vespa/ResultBuilder.java b/container-search/src/main/java/com/yahoo/search/federation/vespa/ResultBuilder.java new file mode 100644 index 00000000000..1361c7c14db --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/federation/vespa/ResultBuilder.java @@ -0,0 +1,642 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.federation.vespa; + +import com.yahoo.log.LogLevel; +import com.yahoo.prelude.hitfield.XMLString; +import com.yahoo.search.Query; +import com.yahoo.search.Result; +import com.yahoo.search.result.ErrorMessage; +import com.yahoo.search.result.Hit; +import com.yahoo.search.result.HitGroup; +import com.yahoo.search.result.Relevance; +import com.yahoo.text.XML; +import com.yahoo.text.DoubleParser; +import org.xml.sax.*; +import org.xml.sax.helpers.DefaultHandler; +import org.xml.sax.helpers.XMLReaderFactory; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Deque; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.logging.Logger; + +import static com.yahoo.text.Lowercase.toLowerCase; + +/** + * Parse Vespa XML results and create Result instances. + * + *

TODO: Ripe for a rewrite or major refactoring. + * + * @author Steinar Knutsen + */ +@SuppressWarnings("deprecation") +public class ResultBuilder extends DefaultHandler { + private static final String ERROR = "error"; + + private static final String FIELD = "field"; + + private static Logger log = Logger.getLogger(ResultBuilder.class.getName()); + + /** Namespaces feature id (http://xml.org/sax/features/namespaces). */ + protected static final String NAMESPACES_FEATURE_ID = "http://xml.org/sax/features/namespaces"; + + /** + * Namespace prefixes feature id + * (http://xml.org/sax/features/namespace-prefixes). + */ + protected static final String NAMESPACE_PREFIXES_FEATURE_ID = "http://xml.org/sax/features/namespace-prefixes"; + + /** Validation feature id (http://xml.org/sax/features/validation). */ + protected static final String VALIDATION_FEATURE_ID = "http://xml.org/sax/features/validation"; + + /** + * Schema validation feature id + * (http://apache.org/xml/features/validation/schema). + */ + protected static final String SCHEMA_VALIDATION_FEATURE_ID = "http://apache.org/xml/features/validation/schema"; + + /** + * Dynamic validation feature id + * (http://apache.org/xml/features/validation/dynamic). + */ + protected static final String DYNAMIC_VALIDATION_FEATURE_ID = "http://apache.org/xml/features/validation/dynamic"; + + // default settings + + /** Default parser name. */ + protected static final String DEFAULT_PARSER_NAME = "org.apache.xerces.parsers.SAXParser"; + + /** Default namespaces support (false). */ + protected static final boolean DEFAULT_NAMESPACES = false; + + /** Default namespace prefixes (false). */ + protected static final boolean DEFAULT_NAMESPACE_PREFIXES = false; + + /** Default validation support (false). */ + protected static final boolean DEFAULT_VALIDATION = false; + + /** Default Schema validation support (false). */ + protected static final boolean DEFAULT_SCHEMA_VALIDATION = false; + + /** Default dynamic validation support (false). */ + protected static final boolean DEFAULT_DYNAMIC_VALIDATION = false; + + private StringBuilder fieldContent; + + private String fieldName; + + private int fieldLevel = 0; + + private boolean hasLiteralTags = false; + + private Map hitFields = new HashMap<>(); + private String hitType; + private String hitRelevance; + private String hitSource; + + private int offset = 0; + + private List tagStack = new ArrayList<>(); + + private final XMLReader parser; + + private Query query; + + private Result result; + + private static enum ResultPart { + ROOT, ERRORDETAILS, HIT, HITGROUP; + } + + Deque location = new ArrayDeque<>(10); + + private String currentErrorCode; + + private String currentError; + + private Deque hitGroups = new ArrayDeque<>(5); + + private static class Tag { + public final String name; + + /** + * Offset is a number which is generated for all data and tags inside + * fields, used to determine whether a tag was closed without enclosing + * any characters or other tags. + */ + public final int offset; + + public Tag(final String name, final int offset) { + this.name = name; + this.offset = offset; + } + + @Override + public String toString() { + return name + '(' + Integer.valueOf(offset) + ')'; + } + } + + /** Default constructor. */ + public ResultBuilder() throws RuntimeException { + this(createParser()); + } + + public ResultBuilder(XMLReader parser) { + this.parser = parser; + this.parser.setContentHandler(this); + this.parser.setErrorHandler(this); + } + + public static XMLReader createParser() { + ClassLoader savedContextClassLoader = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(org.apache.xerces.parsers.SAXParser.class.getClassLoader()); + + try { + XMLReader reader = XMLReaderFactory.createXMLReader(DEFAULT_PARSER_NAME); + setParserFeatures(reader); + return reader; + } catch (Exception e) { + throw new RuntimeException("error: Unable to instantiate parser (" + + DEFAULT_PARSER_NAME + ")", e); + } finally { + Thread.currentThread().setContextClassLoader(savedContextClassLoader); + } + } + + private static void setParserFeatures(XMLReader reader) { + try { + reader.setFeature(NAMESPACES_FEATURE_ID, DEFAULT_NAMESPACES); + } catch (SAXException e) { + log.log(LogLevel.WARNING, "warning: Parser does not support feature (" + + NAMESPACES_FEATURE_ID + ")"); + } + try { + reader.setFeature(NAMESPACE_PREFIXES_FEATURE_ID, + DEFAULT_NAMESPACE_PREFIXES); + } catch (SAXException e) { + log.log(LogLevel.WARNING, "warning: Parser does not support feature (" + + NAMESPACE_PREFIXES_FEATURE_ID + ")"); + } + try { + reader.setFeature(VALIDATION_FEATURE_ID, DEFAULT_VALIDATION); + } catch (SAXException e) { + log.log(LogLevel.WARNING, "warning: Parser does not support feature (" + + VALIDATION_FEATURE_ID + ")"); + } + try { + reader.setFeature(SCHEMA_VALIDATION_FEATURE_ID, + DEFAULT_SCHEMA_VALIDATION); + } catch (SAXNotRecognizedException e) { + log.log(LogLevel.WARNING, "warning: Parser does not recognize feature (" + + SCHEMA_VALIDATION_FEATURE_ID + ")"); + + } catch (SAXNotSupportedException e) { + log.log(LogLevel.WARNING, "warning: Parser does not support feature (" + + SCHEMA_VALIDATION_FEATURE_ID + ")"); + } + + try { + reader.setFeature(DYNAMIC_VALIDATION_FEATURE_ID, + DEFAULT_DYNAMIC_VALIDATION); + } catch (SAXNotRecognizedException e) { + log.log(LogLevel.WARNING, "warning: Parser does not recognize feature (" + + DYNAMIC_VALIDATION_FEATURE_ID + ")"); + + } catch (SAXNotSupportedException e) { + log.log(LogLevel.WARNING, "warning: Parser does not support feature (" + + DYNAMIC_VALIDATION_FEATURE_ID + ")"); + } + } + + @Override + public void startDocument() throws SAXException { + reset(); + result = new Result(query); + hitGroups.addFirst(result.hits()); + location.addFirst(ResultPart.ROOT); + return; + } + + private void reset() { + result = null; + fieldLevel = 0; + hasLiteralTags = false; + tagStack = null; + fieldContent = null; + offset = 0; + currentError = null; + currentErrorCode = null; + hitGroups.clear(); + location.clear(); + } + + @Override + public void startElement(String uri, String local, String raw, + Attributes attrs) throws SAXException { + // "Everybody" wants this switch to be moved into the + // enum class instead, but in this case, I find the classic + // approach more readable. + switch (location.peekFirst()) { + case HIT: + if (fieldLevel > 0) { + tagInField(raw, attrs, FIELD); + ++offset; + return; + } + if (FIELD.equals(raw)) { + ++fieldLevel; + fieldName = attrs.getValue("name"); + fieldContent = new StringBuilder(); + hasLiteralTags = false; + } + break; + case ERRORDETAILS: + if (fieldLevel > 0) { + tagInField(raw, attrs, ERROR); + ++offset; + return; + } + if (ERROR.equals(raw)) { + if (attrs != null) { + currentErrorCode = attrs.getValue("code"); + currentError = attrs.getValue("error"); + } + ++fieldLevel; + fieldContent = new StringBuilder(); + hasLiteralTags = false; + } + break; + case HITGROUP: + if ("hit".equals(raw)) { + startHit(attrs); + } else if ("group".equals(raw)) { + startHitGroup(attrs); + } + break; + case ROOT: + if ("hit".equals(raw)) { + startHit(attrs); + } else if ("errordetails".equals(raw)) { + location.addFirst(ResultPart.ERRORDETAILS); + } else if ("result".equals(raw)) { + if (attrs != null) { + String total = attrs.getValue("total-hit-count"); + if (total != null) { + result.setTotalHitCount(Long.valueOf(total)); + } + } + } else if ("group".equals(raw)) { + startHitGroup(attrs); + } else if (ERROR.equals(raw)) { + if (attrs != null) { + currentErrorCode = attrs.getValue("code"); + fieldContent = new StringBuilder(); + } + } + break; + } + ++offset; + } + + private void startHitGroup(Attributes attrs) { + HitGroup g = new HitGroup(); + Set types = g.types(); + + final String source; + if (attrs != null) { + String groupType = attrs.getValue("type"); + if (groupType != null) { + for (String s : groupType.split(" ")) { + if (s.length() > 0) { + types.add(s); + } + } + } + + source = attrs.getValue("source"); + } else { + source = null; + } + + g.setId((source != null) ? source : "dummy"); + + hitGroups.peekFirst().add(g); + hitGroups.addFirst(g); + location.addFirst(ResultPart.HITGROUP); + } + + private void startHit(Attributes attrs) { + hitFields.clear(); + location.addFirst(ResultPart.HIT); + if (attrs != null) { + hitRelevance = attrs.getValue("relevancy"); + hitSource = attrs.getValue("source"); + hitType = attrs.getValue("type"); + } else { + hitRelevance = null; + hitSource = null; + hitType = null; + } + } + + private void tagInField(String tag, Attributes attrs, String enclosingTag) { + if (!hasLiteralTags) { + hasLiteralTags = true; + String fieldTillNow = XML.xmlEscape(fieldContent.toString(), false); + fieldContent = new StringBuilder(fieldTillNow); + tagStack = new ArrayList<>(); + } + if (enclosingTag.equals(tag)) { + ++fieldLevel; + } + if (tagStack.size() > 0) { + Tag prevTag = tagStack.get(tagStack.size() - 1); + if (prevTag != null && (prevTag.offset + 1) == offset) { + fieldContent.append(">"); + } + } + fieldContent.append("<").append(tag); + if (attrs != null) { + int attrCount = attrs.getLength(); + for (int i = 0; i < attrCount; i++) { + fieldContent.append(" ").append(attrs.getQName(i)) + .append("=\"").append( + XML.xmlEscape(attrs.getValue(i), true)).append( + "\""); + } + } + tagStack.add(new Tag(tag, offset)); + } + + private void endElementInField(String qName, String enclosingTag) { + Tag prevTag = tagStack.get(tagStack.size() - 1); + if (qName.equals(prevTag.name) && offset == (prevTag.offset + 1)) { + fieldContent.append(" />"); + } else { + fieldContent.append("'); + } + if (prevTag.name.equals(qName)) { + tagStack.remove(tagStack.size() - 1); + } + } + + private void endElementInHitField(String qName) { + if (FIELD.equals(qName) && --fieldLevel == 0) { + Object content; + if (hasLiteralTags) { + content = new XMLString(fieldContent.toString()); + } else { + content = fieldContent.toString(); + } + hitFields.put(fieldName, content); + if ("collapseId".equals(fieldName)) { + hitFields.put(fieldName, Integer.valueOf(content.toString())); + } + fieldName = null; + fieldContent = null; + tagStack = null; + } else { + Tag prevTag = tagStack.get(tagStack.size() - 1); + if (qName.equals(prevTag.name) && offset == (prevTag.offset + 1)) { + fieldContent.append(" />"); + } else { + fieldContent.append("'); + } + if (prevTag.name.equals(qName)) { + tagStack.remove(tagStack.size() - 1); + } + } + } + @Override + public void characters(char ch[], int start, int length) + throws SAXException { + + switch (location.peekFirst()) { + case ERRORDETAILS: + case HIT: + if (fieldLevel > 0) { + if (hasLiteralTags) { + if (tagStack.size() > 0) { + Tag tag = tagStack.get(tagStack.size() - 1); + if (tag != null && (tag.offset + 1) == offset) { + fieldContent.append(">"); + } + } + fieldContent.append( + XML.xmlEscape(new String(ch, start, length), false)); + } else { + fieldContent.append(ch, start, length); + } + } + break; + default: + if (fieldContent != null) { + fieldContent.append(ch, start, length); + } + break; + } + ++offset; + } + + @Override + public void ignorableWhitespace(char ch[], int start, int length) + throws SAXException { + return; + } + + @Override + public void processingInstruction(String target, String data) + throws SAXException { + return; + } + + @Override + public void endElement(String namespaceURI, String localName, String qName) + throws SAXException { + switch (location.peekFirst()) { + case HITGROUP: + if ("group".equals(qName)) { + hitGroups.removeFirst(); + location.removeFirst(); + } + break; + case HIT: + if (fieldLevel > 0) { + endElementInHitField(qName); + } else if ("hit".equals(qName)) { + //assert(hitKeys.size() == hitValues.size()); + //We try to get either uri or documentID and use that as id + Object docId = extractDocumentID(); + Hit newHit = new Hit(docId.toString()); + if (hitRelevance != null) newHit.setRelevance(new Relevance(DoubleParser.parse(hitRelevance))); + if(hitSource != null) newHit.setSource(hitSource); + if(hitType != null) { + for(String type: hitType.split(" ")) { + newHit.types().add(type); + } + } + for(Map.Entry field : hitFields.entrySet()) { + newHit.setField(field.getKey(), field.getValue()); + } + + hitGroups.peekFirst().add(newHit); + location.removeFirst(); + } + break; + case ERRORDETAILS: + if (fieldLevel == 1 && ERROR.equals(qName)) { + ErrorMessage error = new ErrorMessage(Integer.valueOf(currentErrorCode), + currentError, + fieldContent.toString()); + hitGroups.peekFirst().addError(error); + currentError = null; + currentErrorCode = null; + fieldContent = null; + tagStack = null; + fieldLevel = 0; + } else if (fieldLevel > 0) { + endElementInField(qName, ERROR); + } else if ("errordetails".equals(qName)) { + location.removeFirst(); + } + break; + case ROOT: + if (ERROR.equals(qName)) { + ErrorMessage error = new ErrorMessage(Integer.valueOf(currentErrorCode), + fieldContent.toString()); + hitGroups.peekFirst().setError(error); + currentErrorCode = null; + fieldContent = null; + } + break; + default: + break; + } + ++offset; + } + + private Object extractDocumentID() { + Object docId = null; + if (hitFields.containsKey("uri")) { + docId = hitFields.get("uri"); + } else { + final String documentId = "documentId"; + if (hitFields.containsKey(documentId)) { + docId = hitFields.get(documentId); + } else { + final String lcDocumentId = toLowerCase(documentId); + for (Map.Entry e : hitFields.entrySet()) { + String key = e.getKey(); + // case insensitive matching, checking length first hoping to avoid some lowercasing + if (documentId.length() == key.length() && lcDocumentId.equals(toLowerCase(key))) { + docId = e.getValue(); + break; + } + } + } + } + if (docId == null) { + docId = "dummy"; + log.info("Results from vespa backend did not contain either uri or documentId"); + } + return docId; + } + + @Override + public void warning(SAXParseException ex) throws SAXException { + printError("Warning", ex); + } + + @Override + public void error(SAXParseException ex) throws SAXException { + printError("Error", ex); + } + + @Override + public void fatalError(SAXParseException ex) throws SAXException { + printError("Fatal Error", ex); + // throw ex; + } + + /** Prints the error message. */ + protected void printError(String type, SAXParseException ex) { + StringBuilder errorMessage = new StringBuilder(); + + errorMessage.append(type); + if (ex != null) { + String systemId = ex.getSystemId(); + if (systemId != null) { + int index = systemId.lastIndexOf('/'); + if (index != -1) + systemId = systemId.substring(index + 1); + errorMessage.append(' ').append(systemId); + } + } + errorMessage.append(':') + .append(ex.getLineNumber()) + .append(':') + .append(ex.getColumnNumber()) + .append(": ") + .append(ex.getMessage()); + log.log(LogLevel.WARNING, errorMessage.toString()); + + } + + public Result parse(String identifier, Query query) { + Result toReturn; + + setQuery(query); + try { + parser.parse(identifier); + } catch (SAXParseException e) { + // ignore + } catch (Exception e) { + log.log(LogLevel.WARNING, "Error parsing result from Vespa",e); + Exception se = e; + if (e instanceof SAXException) { + se = ((SAXException) e).getException(); + } + if (se != null) + se.printStackTrace(System.err); + else + e.printStackTrace(System.err); + } + toReturn = result; + reset(); + return toReturn; + } + + public Result parse(InputSource input, Query query) { + Result toReturn; + + setQuery(query); + try { + parser.parse(input); + } catch (SAXParseException e) { + // ignore + } catch (Exception e) { + log.log(LogLevel.WARNING, "Error parsing result from Vespa",e); + Exception se = e; + if (e instanceof SAXException) { + se = ((SAXException) e).getException(); + } + if (se != null) + se.printStackTrace(System.err); + else + e.printStackTrace(System.err); + } + toReturn = result; + reset(); + return toReturn; + } + + + private void setQuery(Query query) { + this.query = query; + } +} diff --git a/container-search/src/main/java/com/yahoo/search/federation/vespa/VespaSearcher.java b/container-search/src/main/java/com/yahoo/search/federation/vespa/VespaSearcher.java new file mode 100644 index 00000000000..26c9b8ad2cd --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/federation/vespa/VespaSearcher.java @@ -0,0 +1,270 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.federation.vespa; + +import java.io.IOException; +import java.io.InputStream; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.util.Map; +import java.util.Set; + +import org.xml.sax.InputSource; +import org.xml.sax.XMLReader; + +import com.google.inject.Inject; +import com.yahoo.collections.Tuple2; +import com.yahoo.component.ComponentId; +import com.yahoo.component.Version; +import com.yahoo.component.chain.dependencies.After; +import com.yahoo.component.chain.dependencies.Provides; +import com.yahoo.language.Linguistics; +import com.yahoo.log.LogLevel; +import com.yahoo.prelude.query.Item; +import com.yahoo.prelude.query.QueryCanonicalizer; +import com.yahoo.processing.request.CompoundName; +import com.yahoo.search.Query; +import com.yahoo.search.Result; +import com.yahoo.search.cache.QrBinaryCacheConfig; +import com.yahoo.search.cache.QrBinaryCacheRegionConfig; +import com.yahoo.search.federation.FederationSearcher; +import com.yahoo.search.federation.ProviderConfig; +import com.yahoo.search.federation.http.ConfiguredHTTPProviderSearcher; +import com.yahoo.search.federation.http.Connection; +import com.yahoo.search.intent.model.IntentModel; +import com.yahoo.search.query.QueryTree; +import com.yahoo.search.query.textserialize.TextSerialize; +import com.yahoo.search.yql.MinimalQueryInserter; +import com.yahoo.statistics.Statistics; + +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * Backend searcher for external Vespa clusters (queried over http). + * + *

If the "sources" argument should be honored on an external cluster + * when using YQL+, override {@link #chooseYqlSources(Set)}.

+ * + * @author Arne Bergene Fossaa + * @author Steinar Knutsen + */ +@Provides("Vespa") +@After("*") +public class VespaSearcher extends ConfiguredHTTPProviderSearcher { + private final ThreadLocal readerHolder = new ThreadLocal<>(); + private final Query.Type queryType; + private final Tuple2 segmenterVersion; + + private static final CompoundName select = new CompoundName("select"); + private static final CompoundName streamingUserid = new CompoundName( + "streaming.userid"); + private static final CompoundName streamingGroupname = new CompoundName( + "streaming.groupname"); + private static final CompoundName streamingSelection = new CompoundName( + "streaming.selection"); + + /** Create an instance from configuration */ + public VespaSearcher(ComponentId id, ProviderConfig config, + QrBinaryCacheConfig c, QrBinaryCacheRegionConfig r, + Statistics statistics) { + this(id, config, c, r, statistics, null); + } + + /** + * Create an instance from configuration + * + * @param linguistics used for generating meta info for YQL+ + */ + @Inject + public VespaSearcher(ComponentId id, ProviderConfig config, + QrBinaryCacheConfig c, QrBinaryCacheRegionConfig r, + Statistics statistics, @Nullable Linguistics linguistics) { + super(id, config, c, r, statistics); + queryType = toQueryType(config.queryType()); + if (linguistics == null) { + segmenterVersion = null; + } else { + segmenterVersion = linguistics.getVersion(Linguistics.Component.SEGMENTER); + } + } + + /** + * Create an instance from direct parameters having a single connection. + * Useful for testing + */ + public VespaSearcher(String idString, String host, int port, String path) { + super(idString, host, port, path, Statistics.nullImplementation); + queryType = toQueryType(ProviderConfig.QueryType.LEGACY); + segmenterVersion = null; + } + + void addProperty(Map queryMap, Query query, + CompoundName property) { + Object o = query.properties().get(property); + if (o != null) { + queryMap.put(property.toString(), o.toString()); + } + } + + @Override + public Map getQueryMap(Query query) { + Map queryMap = getQueryMapWithoutHitsOffset(query); + queryMap.put("offset", Integer.toString(query.getOffset())); + queryMap.put("hits", Integer.toString(query.getHits())); + queryMap.put("presentation.format", "xml"); + + addProperty(queryMap, query, select); + addProperty(queryMap, query, streamingUserid); + addProperty(queryMap, query, streamingGroupname); + addProperty(queryMap, query, streamingSelection); + return queryMap; + } + + @Override + public Map getCacheKey(Query q) { + return getQueryMapWithoutHitsOffset(q); + } + + private Map getQueryMapWithoutHitsOffset(Query query) { + Map queryMap = super.getQueryMap(query); + if (queryType == Query.Type.YQL) { + queryMap.put(MinimalQueryInserter.YQL.toString(), marshalQuery(query)); + } else { + queryMap.put("query", marshalQuery(query.getModel().getQueryTree())); + queryMap.put("type", queryType.toString()); + } + + addNonExcludedSourceProperties(query, queryMap); + return queryMap; + } + + Query.Type toQueryType(ProviderConfig.QueryType.Enum providerQueryType) { + if (providerQueryType == ProviderConfig.QueryType.LEGACY) { + return Query.Type.ADVANCED; + } else if (providerQueryType == ProviderConfig.QueryType.PROGRAMMATIC) { + return Query.Type.PROGRAMMATIC; + } else if (providerQueryType == ProviderConfig.QueryType.YQL) { + return Query.Type.YQL; + } else { + throw new RuntimeException("Query type " + providerQueryType + + " unsupported."); + } + } + + /** + * Serialize the query parameter for outgoing queries. For YQL+ queries, + * sources and fields will be set to all sources and all fields, to follow + * the behavior of other query types. + * + * @param query + * the current, outgoing query + * @return a string to include in an HTTP request + */ + public String marshalQuery(Query query) { + if (queryType != Query.Type.YQL) { + return marshalQuery(query.getModel().getQueryTree()); + } + + Query workQuery = query.clone(); + String error = QueryCanonicalizer.canonicalize(workQuery); + if (error != null) { + getLogger().log(LogLevel.WARNING, + "Could not normalize [" + query.toString() + "]: " + error); + // Just returning null here is the pattern from existing code... + return null; + } + chooseYqlSources(workQuery.getModel().getSources()); + chooseYqlSummaryFields(workQuery.getPresentation().getSummaryFields()); + return workQuery.yqlRepresentation(getSegmenterVersion(), false); + } + + public String marshalQuery(QueryTree root) { + QueryCanonicalizer.QueryWrapper qw = new QueryCanonicalizer.QueryWrapper(); + root = root.clone(); + qw.setRoot(root.getRoot()); + boolean could = QueryCanonicalizer.treeCanonicalize(qw, root.getRoot(), + null); + if (!could) { + return null; + } + return marshalRoot(qw.getRoot()); + } + + private String marshalRoot(Item root) { + switch (queryType) { + case ADVANCED: + QueryMarshaller marshaller = new QueryMarshaller(); + return marshaller.marshal(root); + case PROGRAMMATIC: + return TextSerialize.serialize(root); + default: + throw new RuntimeException("Unsupported query type."); + } + } + + private XMLReader getReader() { + XMLReader reader = readerHolder.get(); + if (reader == null) { + reader = ResultBuilder.createParser(); + readerHolder.set(reader); + } + return reader; + } + + @Override + public void unmarshal(InputStream stream, long contentLength, Result result) { + ResultBuilder parser = new ResultBuilder(getReader()); + Result mResult = parser.parse(new InputSource(stream), + result.getQuery()); + result.mergeWith(mResult); + result.hits().addAll(mResult.hits().asUnorderedHits()); + } + + /** Returns the canonical Vespa ping URI, http://host:port/status.html */ + @Override + public URI getPingURI(Connection connection) throws MalformedURLException, + URISyntaxException { + return new URL(getParameters().getSchema(), connection.getHost(), + connection.getPort(), "/status.html").toURI(); + } + + /** + * Get the segmenter version data used when creating YQL queries. Useful if + * overriding {@link #marshalQuery(Query)}. + * + * @return a tuple with the name of the segmenting engine in use, and its + * version + */ + protected Tuple2 getSegmenterVersion() { + return segmenterVersion; + } + + /** + * Choose which source arguments to use for the external cluster when + * generating a YQL+ query string. This is called from + * {@link #marshalQuery(Query)}. The default implementation clears the set, + * i.e. requests all sources. Other implementations may modify the source + * set as they see fit, or simply do nothing. + * + * @param sources + * the set of source names to use for the outgoing query + */ + protected void chooseYqlSources(Set sources) { + sources.clear(); + } + + /** + * Choose which summary fields to request from the external cluster when + * generating a YQL+ query string. This is called from + * {@link #marshalQuery(Query)}. The default implementation clears the set, + * i.e. requests all fields. Other implementations may modify the summary + * field set as they see fit, or simply do nothing. + * + * @param summaryFields + * the set of source names to use for the outgoing query + */ + protected void chooseYqlSummaryFields(Set summaryFields) { + summaryFields.clear(); + } +} diff --git a/container-search/src/main/java/com/yahoo/search/federation/vespa/package-info.java b/container-search/src/main/java/com/yahoo/search/federation/vespa/package-info.java new file mode 100644 index 00000000000..6a9f1decb21 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/federation/vespa/package-info.java @@ -0,0 +1,7 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +@ExportPackage +@PublicApi +package com.yahoo.search.federation.vespa; + +import com.yahoo.api.annotations.PublicApi; +import com.yahoo.osgi.annotation.ExportPackage; -- cgit v1.2.3