diff options
Diffstat (limited to 'vespa-http-client/src')
2 files changed, 17 insertions, 23 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java index f59d4a4bbba..d510ce4b7ea 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java @@ -149,16 +149,12 @@ class ApacheGatewayConnection implements GatewayConnection { private InputStream write(List<Document> docs, boolean drain, boolean useCompression) throws ServerResponseException, IOException { - HttpPost httpPost = createPost(drain, useCompression, false /* this is not hanshake */); + HttpPost httpPost = createPost(drain, useCompression, false); - final ByteBuffer[] buffers = getDataWithStartAndEndOfFeed(docs, negotiatedVersion); - final InputStream inputStream = new ByteBufferInputStream(buffers); - final InputStreamEntity reqEntity; - if (useCompression ) { - reqEntity = zipAndCreateEntity(inputStream); - } else { - reqEntity = new InputStreamEntity(inputStream, -1); - } + ByteBuffer[] buffers = getDataWithStartAndEndOfFeed(docs, negotiatedVersion); + InputStream inputStream = new ByteBufferInputStream(buffers); + InputStreamEntity reqEntity = useCompression ? zipAndCreateEntity(inputStream) + : new InputStreamEntity(inputStream, -1); reqEntity.setChunked(true); httpPost.setEntity(reqEntity); return executePost(httpPost); diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java index b7638ff0967..dfc5d0d0a66 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java @@ -184,22 +184,22 @@ class IOThread implements Runnable, AutoCloseable { chunkSizeBytes = doc.size(); } } catch (InterruptedException ie) { - log.fine("Got break signal while waiting for new documents to feed."); + log.fine("Got break signal while waiting for new documents to feed"); return docsForSendChunk; } int pendingSize = 1 + resultQueue.getPendingSize(); // see if we can get more documents without blocking + while (chunkSizeBytes < maxChunkSizeBytes && pendingSize < maxInFlightRequests) { drainFirstDocumentsInQueueIfOld(); - Document d = documentQueue.poll(); - if (d == null) { - break; - } - docsForSendChunk.add(d); - chunkSizeBytes += d.size(); + Document document = documentQueue.poll(); + if (document == null) break; + docsForSendChunk.add(document); + chunkSizeBytes += document.size(); pendingSize++; } - log.finest("Chunk has " + docsForSendChunk.size() + " docs with a size " + chunkSizeBytes + " bytes."); + if (log.isLoggable(Level.FINE)) + log.finest("Chunk has " + docsForSendChunk.size() + " docs with a size " + chunkSizeBytes + " bytes"); docsReceivedCounter.addAndGet(docsForSendChunk.size()); return docsForSendChunk; } @@ -271,10 +271,9 @@ class IOThread implements Runnable, AutoCloseable { int pendingResultQueueSize = resultQueue.getPendingSize(); pendingDocumentStatusCount.set(pendingResultQueueSize); - List<Document> nextDocsForFeeding = - (pendingResultQueueSize > maxInFlightRequests) - ? new ArrayList<>() // The queue is full, will not send more documents. - : getNextDocsForFeeding(maxWaitTimeMs, TimeUnit.MILLISECONDS); + List<Document> nextDocsForFeeding = (pendingResultQueueSize > maxInFlightRequests) + ? new ArrayList<>() // The queue is full, will not send more documents + : getNextDocsForFeeding(maxWaitTimeMs, TimeUnit.MILLISECONDS); if (nextDocsForFeeding.isEmpty() && pendingResultQueueSize == 0) { //we have no unfinished business with the server now. @@ -286,8 +285,7 @@ class IOThread implements Runnable, AutoCloseable { if (pendingResultQueueSize > maxInFlightRequests && processResponse.processResultsCount == 0) { try { - // Max outstanding document operations, no more results on server side, wait a bit - // before asking again. + // Max outstanding document operations, no more results on server side, wait a bit before asking again Thread.sleep(300); } catch (InterruptedException e) { // Ignore |