diff options
author | Jon Bratseth <bratseth@gmail.com> | 2022-06-06 16:35:20 +0200 |
---|---|---|
committer | gjoranv <gv@verizonmedia.com> | 2022-06-08 11:45:30 +0200 |
commit | 3c11ceb432b5bcb7639758330fad5433ec03f264 (patch) | |
tree | 6ba2c384ec40f377ce370b9a0dcaeaedac128067 /vespa-hadoop | |
parent | 22f76eae5d33da3c4acea18b827873b6ec36ef61 (diff) |
Remove http client use
Diffstat (limited to 'vespa-hadoop')
6 files changed, 2 insertions, 318 deletions
diff --git a/vespa-hadoop/pom.xml b/vespa-hadoop/pom.xml index 0c724c95839..1dde2d58610 100644 --- a/vespa-hadoop/pom.xml +++ b/vespa-hadoop/pom.xml @@ -136,11 +136,6 @@ <!-- Vespa feeding dependencies --> <dependency> <groupId>com.yahoo.vespa</groupId> - <artifactId>vespa-http-client</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>com.yahoo.vespa</groupId> <artifactId>vespa-feed-client</artifactId> <version>${project.version}</version> </dependency> diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/LegacyVespaRecordWriter.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/LegacyVespaRecordWriter.java deleted file mode 100644 index 6900c7dc82f..00000000000 --- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/LegacyVespaRecordWriter.java +++ /dev/null @@ -1,233 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.hadoop.mapreduce; - -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonParseException; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonToken; -import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration; -import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters; -import com.yahoo.vespa.hadoop.pig.VespaDocumentOperation; -import com.yahoo.vespa.http.client.config.Cluster; -import com.yahoo.vespa.http.client.config.ConnectionParams; -import com.yahoo.vespa.http.client.config.Endpoint; -import com.yahoo.vespa.http.client.config.FeedParams; -import com.yahoo.vespa.http.client.config.FeedParams.DataFormat; -import com.yahoo.vespa.http.client.config.SessionParams; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -import javax.xml.namespace.QName; -import javax.xml.stream.FactoryConfigurationError; -import javax.xml.stream.XMLEventReader; -import javax.xml.stream.XMLInputFactory; -import javax.xml.stream.XMLStreamException; -import javax.xml.stream.events.StartElement; -import javax.xml.stream.events.XMLEvent; -import java.io.IOException; -import java.io.StringReader; -import java.time.Duration; -import java.util.List; -import java.util.StringTokenizer; -import java.util.concurrent.ThreadLocalRandom; -import java.util.logging.Logger; - -/** - * {@link LegacyVespaRecordWriter} sends the output <key, value> to one or more Vespa endpoints using vespa-http-client. - * - * @author lesters - * @deprecated Replaced by {@link VespaRecordWriter} - */ -@Deprecated -public class LegacyVespaRecordWriter extends RecordWriter<Object, Object> { - - private final Logger log = Logger.getLogger(getClass().getCanonicalName()); - - private boolean initialized = false; - private com.yahoo.vespa.http.client.FeedClient feedClient; - private final VespaCounters counters; - private final int progressInterval; - - final VespaConfiguration configuration; - - LegacyVespaRecordWriter(VespaConfiguration configuration, VespaCounters counters) { - this.counters = counters; - this.configuration = configuration; - this.progressInterval = configuration.progressInterval(); - } - - - @Override - public void write(Object key, Object data) throws IOException, InterruptedException { - if (!initialized) { - initialize(); - } - - String doc = data.toString().trim(); - - // Parse data to find document id - if none found, skip this write - String docId = DataFormat.JSON_UTF8.equals(configuration.dataFormat()) ? findDocId(doc) - : findDocIdFromXml(doc); - if (docId != null && docId.length() >= 0) { - feedClient.stream(docId, doc); - counters.incrementDocumentsSent(1); - } else { - counters.incrementDocumentsSkipped(1); - } - - if (counters.getDocumentsSent() % progressInterval == 0) { - String progress = String.format("Feed progress: %d / %d / %d / %d (sent, ok, failed, skipped)", - counters.getDocumentsSent(), - counters.getDocumentsOk(), - counters.getDocumentsFailed(), - counters.getDocumentsSkipped()); - log.info(progress); - } - - } - - - @Override - public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { - if (feedClient != null) { - feedClient.close(); - } - } - - protected ConnectionParams.Builder configureConnectionParams() { - ConnectionParams.Builder connParamsBuilder = new ConnectionParams.Builder(); - connParamsBuilder.setDryRun(configuration.dryrun()); - connParamsBuilder.setUseCompression(configuration.useCompression()); - connParamsBuilder.setNumPersistentConnectionsPerEndpoint(configuration.numConnections()); - connParamsBuilder.setMaxRetries(configuration.numRetries()); - if (configuration.proxyHost() != null) { - connParamsBuilder.setProxyHost(configuration.proxyHost()); - } - if (configuration.proxyPort() >= 0) { - connParamsBuilder.setProxyPort(configuration.proxyPort()); - } - return connParamsBuilder; - } - - protected FeedParams.Builder configureFeedParams() { - FeedParams.Builder feedParamsBuilder = new FeedParams.Builder(); - feedParamsBuilder.setDataFormat(configuration.dataFormat()); - feedParamsBuilder.setRoute(configuration.route()); - feedParamsBuilder.setMaxSleepTimeMs(configuration.maxSleepTimeMs()); - feedParamsBuilder.setMaxInFlightRequests(configuration.maxInFlightRequests()); - feedParamsBuilder.setLocalQueueTimeOut(Duration.ofMinutes(10).toMillis()); - return feedParamsBuilder; - } - - protected SessionParams.Builder configureSessionParams() { - SessionParams.Builder sessionParamsBuilder = new SessionParams.Builder(); - sessionParamsBuilder.setThrottlerMinSize(configuration.throttlerMinSize()); - sessionParamsBuilder.setClientQueueSize(configuration.maxInFlightRequests()*2); - return sessionParamsBuilder; - } - - private void initialize() { - if (!configuration.dryrun() && configuration.randomStartupSleepMs() > 0) { - int delay = ThreadLocalRandom.current().nextInt(configuration.randomStartupSleepMs()); - log.info("VespaStorage: Delaying startup by " + delay + " ms"); - try { - Thread.sleep(delay); - } catch (Exception e) {} - } - - ConnectionParams.Builder connParamsBuilder = configureConnectionParams(); - FeedParams.Builder feedParamsBuilder = configureFeedParams(); - SessionParams.Builder sessionParams = configureSessionParams(); - - sessionParams.setConnectionParams(connParamsBuilder.build()); - sessionParams.setFeedParams(feedParamsBuilder.build()); - - String endpoints = configuration.endpoint(); - StringTokenizer tokenizer = new StringTokenizer(endpoints, ","); - while (tokenizer.hasMoreTokens()) { - String endpoint = tokenizer.nextToken().trim(); - sessionParams.addCluster(new Cluster.Builder().addEndpoint( - Endpoint.create(endpoint, configuration.defaultPort(), configuration.useSSL().orElse(false)) - ).build()); - } - - ResultCallback resultCallback = new ResultCallback(counters); - feedClient = com.yahoo.vespa.http.client.FeedClientFactory.create(sessionParams.build(), resultCallback); - - initialized = true; - log.info("VespaStorage configuration:\n" + configuration.toString()); - log.info(feedClient.getStatsAsJson()); - } - - private String findDocIdFromXml(String xml) { - try { - XMLEventReader eventReader = XMLInputFactory.newInstance().createXMLEventReader(new StringReader(xml)); - while (eventReader.hasNext()) { - XMLEvent event = eventReader.nextEvent(); - if (event.getEventType() == XMLEvent.START_ELEMENT) { - StartElement element = event.asStartElement(); - String elementName = element.getName().getLocalPart(); - if (VespaDocumentOperation.Operation.valid(elementName)) { - return element.getAttributeByName(QName.valueOf("documentid")).getValue(); - } - } - } - } catch (XMLStreamException | FactoryConfigurationError e) { - // as json dude does - return null; - } - return null; - } - - private String findDocId(String json) throws IOException { - JsonFactory factory = new JsonFactory(); - try(JsonParser parser = factory.createParser(json)) { - if (parser.nextToken() != JsonToken.START_OBJECT) { - return null; - } - while (parser.nextToken() != JsonToken.END_OBJECT) { - String fieldName = parser.getCurrentName(); - parser.nextToken(); - if (VespaDocumentOperation.Operation.valid(fieldName)) { - String docId = parser.getText(); - return docId; - } else { - parser.skipChildren(); - } - } - } catch (JsonParseException ex) { - return null; - } - return null; - } - - - class ResultCallback implements com.yahoo.vespa.http.client.FeedClient.ResultCallback { - final VespaCounters counters; - - public ResultCallback(VespaCounters counters) { - this.counters = counters; - } - - @Override - public void onCompletion(String docId, com.yahoo.vespa.http.client.Result documentResult) { - if (!documentResult.isSuccess()) { - counters.incrementDocumentsFailed(1); - StringBuilder sb = new StringBuilder(); - sb.append("Problems with docid "); - sb.append(docId); - sb.append(": "); - List<com.yahoo.vespa.http.client.Result.Detail> details = documentResult.getDetails(); - for (com.yahoo.vespa.http.client.Result.Detail detail : details) { - sb.append(detail.toString()); - sb.append(" "); - } - log.warning(sb.toString()); - return; - } - counters.incrementDocumentsOk(1); - } - - } - -} diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java index 66ab94574d9..42eb6293eee 100644 --- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java +++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java @@ -10,24 +10,16 @@ import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import java.io.IOException; -import java.util.Objects; import java.util.Properties; -import java.util.logging.Logger; - -import static com.yahoo.vespa.http.client.config.FeedParams.DataFormat.XML_UTF8; /** * An output specification for writing to Vespa instances in a Map-Reduce job. - * Mainly returns an instance of a {@link LegacyVespaRecordWriter} that does the - * actual feeding to Vespa. * * @author lesters */ @SuppressWarnings("rawtypes") public class VespaOutputFormat extends OutputFormat { - private static final Logger log = Logger.getLogger(VespaOutputFormat.class.getName()); - final Properties configOverride; public VespaOutputFormat() { @@ -42,18 +34,10 @@ public class VespaOutputFormat extends OutputFormat { @Override - @SuppressWarnings("deprecation") public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException { VespaCounters counters = VespaCounters.get(context); VespaConfiguration configuration = VespaConfiguration.get(context.getConfiguration(), configOverride); - Boolean useLegacyClient = configuration.useLegacyClient().orElse(null); - if (Objects.equals(useLegacyClient, Boolean.TRUE) || configuration.dataFormat() == XML_UTF8) { - log.warning("Feeding with legacy client or XML will no longer be supported on Vespa 8. " + - "See https://docs.vespa.ai/en/vespa8-release-notes.html"); - return new LegacyVespaRecordWriter(configuration, counters); - } else { - return new VespaRecordWriter(configuration, counters); - } + return new VespaRecordWriter(configuration, counters); } diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java index 6d6c3789835..c450d7cdeef 100644 --- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java +++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java @@ -7,7 +7,6 @@ import ai.vespa.feed.client.JsonFeeder; import ai.vespa.feed.client.OperationParseException; import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration; import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters; -import com.yahoo.vespa.http.client.config.FeedParams; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -85,18 +84,11 @@ public class VespaRecordWriter extends RecordWriter<Object, Object> { private void initializeOnFirstWrite() { if (initialized) return; - validateConfig(); useRandomizedStartupDelayIfEnabled(); feeder = createJsonStreamFeeder(); initialized = true; } - private void validateConfig() { - if (config.dataFormat() != FeedParams.DataFormat.JSON_UTF8) { - throw new IllegalArgumentException("Only JSON is support by this feed client implementation"); - } - } - private void useRandomizedStartupDelayIfEnabled() { if (!config.dryrun() && config.randomStartupSleepMs() > 0) { int delay = ThreadLocalRandom.current().nextInt(config.randomStartupSleepMs()); diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java index 715546fe6fe..54be261fbe7 100644 --- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java +++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java @@ -1,7 +1,6 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.hadoop.mapreduce.util; -import com.yahoo.vespa.http.client.config.FeedParams; import org.apache.hadoop.conf.Configuration; import java.io.IOException; @@ -19,7 +18,6 @@ public class VespaConfiguration { public static final String PROXY_SCHEME = "vespa.feed.proxy.scheme"; public static final String DRYRUN = "vespa.feed.dryrun"; public static final String USE_COMPRESSION = "vespa.feed.usecompression"; - public static final String DATA_FORMAT = "vespa.feed.data.format"; public static final String PROGRESS_REPORT = "vespa.feed.progress.interval"; public static final String CONNECTIONS = "vespa.feed.connections"; public static final String THROTTLER_MIN_SIZE = "vespa.feed.throttler.min.size"; @@ -29,7 +27,6 @@ public class VespaConfiguration { public static final String MAX_IN_FLIGHT_REQUESTS = "vespa.feed.max.in.flight.requests"; public static final String RANDOM_STARTUP_SLEEP = "vespa.feed.random.startup.sleep.ms"; public static final String NUM_RETRIES = "vespa.feed.num.retries"; - public static final String USE_LEGACY_CLIENT = "vespa.feed.uselegacyclient"; private final Configuration conf; private final Properties override; @@ -39,115 +36,82 @@ public class VespaConfiguration { this.override = override; } - public static VespaConfiguration get(Configuration conf, Properties override) { return new VespaConfiguration(conf, override); } - public String endpoint() { return getString(ENDPOINT); } - public int defaultPort() { return getInt(DEFAULT_PORT, 4080); } - public Optional<Boolean> useSSL() { String raw = getString(USE_SSL); if (raw == null || raw.trim().isEmpty()) return Optional.empty(); return Optional.of(Boolean.parseBoolean(raw)); } - public String proxyHost() { return getString(PROXY_HOST); } - public int proxyPort() { return getInt(PROXY_PORT, 4080); } - public String proxyScheme() { String raw = getString(PROXY_SCHEME); if (raw == null) return "http"; return raw; } - public boolean dryrun() { return getBoolean(DRYRUN, false); } - public boolean useCompression() { return getBoolean(USE_COMPRESSION, true); } - public int numConnections() { return getInt(CONNECTIONS, 1); } - public int throttlerMinSize() { return getInt(THROTTLER_MIN_SIZE, 0); } - public int queryConnectionTimeout() { return getInt(QUERY_CONNECTION_TIMEOUT, 10000); } - public String route() { return getString(ROUTE); } - public int maxSleepTimeMs() { return getInt(MAX_SLEEP_TIME_MS, 10000); } - public int maxInFlightRequests() { return getInt(MAX_IN_FLIGHT_REQUESTS, 500); } - public int randomStartupSleepMs() { return getInt(RANDOM_STARTUP_SLEEP, 30000); } - public int numRetries() { return getInt(NUM_RETRIES, 100); } - - public FeedParams.DataFormat dataFormat() { - String format = getString(DATA_FORMAT); - if ("xml".equalsIgnoreCase(format)) { - return FeedParams.DataFormat.XML_UTF8; - } - return FeedParams.DataFormat.JSON_UTF8; - } - - public int progressInterval() { return getInt(PROGRESS_REPORT, 1000); } - public Optional<Boolean> useLegacyClient() { - String raw = getString(USE_LEGACY_CLIENT); - if (raw == null || raw.trim().isEmpty()) return Optional.empty(); - return Optional.of(Boolean.parseBoolean(raw)); - } - public String getString(String name) { if (override != null && override.containsKey(name)) { return override.getProperty(name); @@ -197,7 +161,6 @@ public class VespaConfiguration { sb.append(PROXY_SCHEME + ": " + proxyScheme() + "\n"); sb.append(DRYRUN + ": " + dryrun() +"\n"); sb.append(USE_COMPRESSION + ": " + useCompression() +"\n"); - sb.append(DATA_FORMAT + ": " + dataFormat() +"\n"); sb.append(PROGRESS_REPORT + ": " + progressInterval() +"\n"); sb.append(CONNECTIONS + ": " + numConnections() +"\n"); sb.append(THROTTLER_MIN_SIZE + ": " + throttlerMinSize() +"\n"); @@ -207,7 +170,6 @@ public class VespaConfiguration { sb.append(MAX_IN_FLIGHT_REQUESTS + ": " + maxInFlightRequests() +"\n"); sb.append(RANDOM_STARTUP_SLEEP + ": " + randomStartupSleepMs() +"\n"); sb.append(NUM_RETRIES + ": " + numRetries() +"\n"); - sb.append(USE_LEGACY_CLIENT + ": " + useLegacyClient().map(Object::toString).orElse("<empty>") +"\n"); return sb.toString(); } diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java index f690e767194..39b24799002 100644 --- a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java +++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java @@ -24,35 +24,24 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; public class VespaStorageTest { @Test - public void requireThatPremadeXmlOperationsFeedSucceeds() throws Exception { - Configuration conf = new HdfsConfiguration(); - conf.set(VespaConfiguration.DATA_FORMAT, "xml"); - assertAllDocumentsOk("src/test/pig/feed_operations_xml.pig", conf); - } - - - @Test public void requireThatPremadeOperationsFeedSucceeds() throws Exception { assertAllDocumentsOk("src/test/pig/feed_operations.pig"); } - @Test public void requireThatPremadeMultilineOperationsFeedSucceeds() throws Exception { assertAllDocumentsOk("src/test/pig/feed_multiline_operations.pig"); } - @Test public void requireThatPremadeOperationsWithJsonLoaderFeedSucceeds() throws Exception { assertAllDocumentsOk("src/test/pig/feed_operations_with_json_loader.pig"); } @Test - public void requireThatPremadeOperationsWithJsonLoaderFeedAndNonLegacyClientSucceeds() throws Exception { + public void requireThatPremadeOperationsWithJsonLoaderFeedWithSllSucceeds() throws Exception { Configuration conf = new HdfsConfiguration(); conf.set(VespaConfiguration.USE_SSL, Boolean.TRUE.toString()); - conf.set(VespaConfiguration.USE_LEGACY_CLIENT, Boolean.FALSE.toString()); assertAllDocumentsOk("src/test/pig/feed_operations_with_json_loader.pig", conf); } @@ -61,19 +50,16 @@ public class VespaStorageTest { assertAllDocumentsOk("src/test/pig/feed_create_operations.pig"); } - @Test public void requireThatCreateOperationsShortFormFeedSucceeds() throws Exception { assertAllDocumentsOk("src/test/pig/feed_create_operations_short_form.pig"); } - @Test public void requireThatFeedVisitDataSucceeds() throws Exception { assertAllDocumentsOk("src/test/pig/feed_visit_data.pig"); } - private PigServer setup(String script, Configuration conf) throws Exception { if (conf == null) { conf = new HdfsConfiguration(); @@ -92,12 +78,10 @@ public class VespaStorageTest { return ps; } - private void assertAllDocumentsOk(String script) throws Exception { assertAllDocumentsOk(script, null); } - private void assertAllDocumentsOk(String script, Configuration conf) throws Exception { PigServer ps = setup(script, conf); List<ExecJob> jobs = ps.executeBatch(); |