4040import java .util .Collections ;
4141import java .util .List ;
4242import java .util .concurrent .ArrayBlockingQueue ;
43- import java .util .concurrent .BlockingQueue ;
4443import java .util .concurrent .CancellationException ;
4544import java .util .concurrent .ExecutionException ;
4645import java .util .concurrent .Executor ;
@@ -267,7 +266,7 @@ static class StreamingRead extends BaseObjectReadSessionStreamRead<ScatteringByt
267266
268267 private final Hasher hasher ;
269268 private final SettableApiFuture <Void > failFuture ;
270- private final BlockingQueue <Closeable > queue ;
269+ private final ArrayBlockingQueue <Closeable > queue ;
271270
272271 private AtomicLong readId ;
273272 private boolean complete ;
@@ -384,7 +383,6 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
384383 }
385384
386385 long read = 0 ;
387- long dstsRemaining = Buffers .totalRemaining (dsts , offset , length );
388386 if (leftovers != null ) {
389387 read += leftovers .copy (dsts , offset , length );
390388 if (!leftovers .hasRemaining ()) {
@@ -393,34 +391,41 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
393391 }
394392 }
395393
396- Object poll ;
397- while (read < dstsRemaining && (poll = queue .poll ()) != null ) {
398- if (poll instanceof ChildRef ) {
399- ChildRefHelper ref = new ChildRefHelper ((ChildRef ) poll );
400- read += ref .copy (dsts , offset , length );
401- if (ref .hasRemaining ()) {
402- leftovers = ref ;
394+ try {
395+ Object poll ;
396+ while (leftovers == null && (poll = queue .poll (10 , TimeUnit .MICROSECONDS )) != null ) {
397+ if (poll instanceof ChildRef ) {
398+ ChildRefHelper ref = new ChildRefHelper ((ChildRef ) poll );
399+ read += ref .copy (dsts , offset , length );
400+ if (ref .hasRemaining ()) {
401+ leftovers = ref ;
402+ break ;
403+ } else {
404+ ref .ref .close ();
405+ }
406+ } else if (poll == EofMarker .INSTANCE ) {
407+ complete = true ;
408+ if (read == 0 ) {
409+ close ();
410+ return -1 ;
411+ }
403412 break ;
404- } else {
405- ref .ref .close ();
406- }
407- } else if (poll == EofMarker .INSTANCE ) {
408- complete = true ;
409- if (read == 0 ) {
413+ } else if (poll instanceof SmuggledFailure ) {
414+ SmuggledFailure throwable = (SmuggledFailure ) poll ;
410415 close ();
411- return -1 ;
416+ BaseServiceException coalesce = StorageException .coalesce (throwable .getSmuggled ());
417+ throw new IOException (coalesce );
418+ } else {
419+ //noinspection DataFlowIssue
420+ Preconditions .checkState (
421+ false , "unhandled queue element type %s" , poll .getClass ().getName ());
412422 }
413- break ;
414- } else if (poll instanceof SmuggledFailure ) {
415- SmuggledFailure throwable = (SmuggledFailure ) poll ;
416- close ();
417- BaseServiceException coalesce = StorageException .coalesce (throwable .getSmuggled ());
418- throw new IOException (coalesce );
419- } else {
420- //noinspection DataFlowIssue
421- Preconditions .checkState (
422- false , "unhandled queue element type %s" , poll .getClass ().getName ());
423423 }
424+ } catch (InterruptedException e ) {
425+ Thread .currentThread ().interrupt ();
426+ InterruptedIOException ioe = new InterruptedIOException ();
427+ ioe .initCause (e );
428+ throw ioe ;
424429 }
425430
426431 return read ;
@@ -431,7 +436,9 @@ private void offer(Closeable offer) throws InterruptedIOException {
431436 queue .put (offer );
432437 } catch (InterruptedException e ) {
433438 Thread .currentThread ().interrupt ();
434- throw new InterruptedIOException ();
439+ InterruptedIOException ioe = new InterruptedIOException ();
440+ ioe .initCause (e );
441+ throw ioe ;
435442 }
436443 }
437444
0 commit comments