Skip to content

Commit d0ffe18

Browse files
authored
fix: update BlobAppendableUpload implementation to periodically flush for large writes (#3278)
This main idea here is to allow async incremental clearing of the outbound queue even when large writes are performed. Previously, when using the MinFlushStrategy, if a large write was performed (larger than maxPendingBytes) a single `flush: true state_lookup: true` would be sent to GCS, thereby making it so that no new writes could be accepted until the full `maxPendingBytes` where ack'd. This change updates so that if a write is larger than `minFlushSize` a message will be annotated `flush: true state_lookup: true`. This doesn't necessarily mean that a flush will be done every `minFlushSize` as the message packed can be up to 2MiB, this will simply annotate a message as `flush: true state_lookup: true` if it has been at least `minFlushSize` bytes since we sent a flush.
1 parent 3240f67 commit d0ffe18

File tree

4 files changed

+61
-23
lines changed

4 files changed

+61
-23
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/BidiAppendableUnbufferedWritableByteChannel.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,23 +31,30 @@ final class BidiAppendableUnbufferedWritableByteChannel implements UnbufferedWri
3131

3232
private final BidiUploadStreamingStream stream;
3333
private final ChunkSegmenter chunkSegmenter;
34+
private final long flushInterval;
3435

3536
private boolean open;
3637
private long writeOffset;
3738
private volatile boolean nextWriteShouldFinalize;
3839
private boolean writeCalledAtLeastOnce;
40+
private long lastFlushOffset;
3941

4042
/** If write throws an error, don't attempt to finalize things when {@link #close()} is called. */
4143
private boolean writeThrewError;
4244

4345
BidiAppendableUnbufferedWritableByteChannel(
44-
BidiUploadStreamingStream stream, ChunkSegmenter chunkSegmenter, long writeOffset) {
46+
BidiUploadStreamingStream stream,
47+
ChunkSegmenter chunkSegmenter,
48+
long flushInterval,
49+
long writeOffset) {
4550
this.stream = stream;
4651
this.chunkSegmenter = chunkSegmenter;
52+
this.flushInterval = flushInterval;
4753
this.open = true;
4854
this.writeOffset = writeOffset;
4955
this.nextWriteShouldFinalize = false;
5056
this.writeThrewError = false;
57+
this.lastFlushOffset = writeOffset;
5158
}
5259

5360
@Override
@@ -141,8 +148,9 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength) th
141148
for (int i = 0, len = data.length, lastIdx = len - 1; i < len; i++) {
142149
ChunkSegment datum = data[i];
143150
int size = datum.getB().size();
151+
boolean shouldFlush = writeOffset + size >= lastFlushOffset + flushInterval;
144152
boolean appended;
145-
if (i < lastIdx) {
153+
if (i < lastIdx && !shouldFlush) {
146154
appended = stream.append(datum);
147155
} else if (i == lastIdx && nextWriteShouldFinalize) {
148156
appended = stream.appendAndFinalize(datum);
@@ -152,6 +160,9 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength) th
152160
if (appended) {
153161
bytesConsumed += size;
154162
writeOffset += size;
163+
if (shouldFlush) {
164+
lastFlushOffset = writeOffset;
165+
}
155166
} else {
156167
// if we weren't able to trigger a flush by reaching the end of the array and calling
157168
// appendAndFlush, explicitly call flush here so that some progress can be made.
@@ -171,6 +182,7 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength) th
171182

172183
private void awaitResultFuture() throws IOException {
173184
try {
185+
stream.awaitAckOf(writeOffset);
174186
stream.getResultFuture().get(10_717, TimeUnit.MILLISECONDS);
175187
} catch (InterruptedException e) {
176188
Thread.currentThread().interrupt();

google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadState.java

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -327,9 +327,9 @@ abstract static class BaseUploadState extends BidiUploadState {
327327
protected @NonNull State state;
328328
protected @MonotonicNonNull BidiWriteObjectResponse lastResponseWithResource;
329329
protected @Nullable State stateToReturnToAfterRetry;
330-
protected boolean finalFlushSignaled;
330+
protected long finalFlushOffset;
331331
protected boolean finalFlushSent;
332-
protected boolean finishWriteSignaled;
332+
protected long finishWriteOffset;
333333
protected boolean finishWriteSent;
334334
protected @MonotonicNonNull OpenArguments lastOpenArguments;
335335
protected @Nullable SettableApiFuture<Void> pendingReconciliation;
@@ -356,6 +356,8 @@ private BaseUploadState(
356356
this.totalSentBytes = 0;
357357
this.confirmedBytes = -1;
358358
this.state = startingState;
359+
this.finalFlushOffset = -1;
360+
this.finishWriteOffset = -1;
359361
}
360362

361363
@Override
@@ -463,17 +465,17 @@ public boolean finalFlush(long totalLength) {
463465

464466
BidiWriteObjectRequest currentLast = peekLast();
465467
boolean equals = flush.equals(currentLast);
466-
if (equals && finalFlushSignaled) {
468+
if (equals && finalFlushOffset == totalLength) {
467469
return true;
468470
} else if (equals && lastSentRequestIndex == queue.size() - 1) {
469-
finalFlushSignaled = true;
471+
finalFlushOffset = totalLength;
470472
finalFlushSent = true;
471473
return true;
472474
}
473475

474476
boolean offered = internalOffer(flush);
475477
if (offered) {
476-
finalFlushSignaled = true;
478+
finalFlushOffset = totalLength;
477479
}
478480
return offered;
479481
} finally {
@@ -561,7 +563,9 @@ final void updateStateFromResponse(BidiWriteObjectResponse response) {
561563
} else if (peek.hasOneof(FIRST_MESSAGE_DESCRIPTOR)) {
562564
poll();
563565
} else if (peek.getFlush()) {
564-
if (finalFlushSent && persistedSize == totalSentBytes) {
566+
if (finalFlushSent
567+
&& persistedSize == totalSentBytes
568+
&& persistedSize == finalFlushOffset) {
565569
setConfirmedBytes(persistedSize);
566570
signalTerminalSuccess = true;
567571
poll();
@@ -575,7 +579,9 @@ final void updateStateFromResponse(BidiWriteObjectResponse response) {
575579
checkState(
576580
enqueuedBytes == 0,
577581
"attempting to evict finish_write: true while bytes are still enqueued");
578-
if (response.hasResource() && persistedSize == totalSentBytes) {
582+
if (response.hasResource()
583+
&& persistedSize == totalSentBytes
584+
&& persistedSize == finishWriteOffset) {
579585
setConfirmedBytes(persistedSize);
580586
if (response.getResource().hasFinalizeTime()) {
581587
signalTerminalSuccess = true;
@@ -697,7 +703,7 @@ final void retrying() {
697703
final boolean isFinalizing() {
698704
lock.lock();
699705
try {
700-
return finishWriteSignaled && finishWriteSent;
706+
return finishWriteOffset >= 0 && finishWriteSent;
701707
} finally {
702708
lock.unlock();
703709
}
@@ -753,7 +759,7 @@ final void sendVia(Consumer<BidiWriteObjectRequest> consumer) {
753759
if (prev != null) {
754760
if (prev.getFinishWrite()) {
755761
finishWriteSent = true;
756-
} else if (prev.getFlush() && prev.getStateLookup() && finalFlushSignaled) {
762+
} else if (prev.getFlush() && prev.getStateLookup() && finalFlushOffset > -1) {
757763
finalFlushSent = true;
758764
}
759765
consumer.accept(prev);
@@ -823,7 +829,7 @@ protected final void validateCurrentStateIsOneOf(State... allowed) {
823829

824830
private void checkNotFinalizing() {
825831
checkState(
826-
!finishWriteSignaled,
832+
finishWriteOffset == -1,
827833
"Attempting to append bytes even though finalization has previously been signaled.");
828834
}
829835

@@ -835,23 +841,26 @@ protected final boolean internalOffer(BidiWriteObjectRequest e) {
835841
}
836842
add = this::prepend;
837843
}
838-
if (e.getFinishWrite()) {
839-
finishWriteSignaled = true;
840-
}
841844

842-
if (e.hasChecksummedData() && !finishWriteSignaled) {
845+
boolean appended = false;
846+
if (e.hasChecksummedData() && finishWriteOffset == -1) {
843847
ChecksummedData checksummedData = e.getChecksummedData();
844848
int size = checksummedData.getContent().size();
845849
if (size <= availableCapacity()) {
846850
totalSentBytes += size;
847851
add.accept(e);
848-
return true;
852+
appended = true;
849853
}
850-
return false;
851854
} else {
852855
add.accept(e);
853-
return true;
856+
appended = true;
854857
}
858+
859+
if (e.getFinishWrite()) {
860+
finishWriteOffset = totalSentBytes;
861+
}
862+
863+
return appended;
855864
}
856865

857866
@Nullable

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUploadConfig.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.google.cloud.storage.BidiUploadState.TakeoverAppendableUploadState;
2727
import com.google.cloud.storage.BlobAppendableUpload.AppendableUploadWriteableByteChannel;
2828
import com.google.cloud.storage.BlobAppendableUploadImpl.AppendableObjectBufferedWritableByteChannel;
29+
import com.google.cloud.storage.FlushPolicy.MinFlushSizeFlushPolicy;
2930
import com.google.cloud.storage.Storage.BlobWriteOption;
3031
import com.google.cloud.storage.TransportCompatibility.Transport;
3132
import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt;
@@ -266,9 +267,14 @@ BlobAppendableUpload create(GrpcStorageImpl storage, BlobInfo info, Opts<ObjectT
266267
stream.awaitTakeoverStateReconciliation();
267268
c =
268269
new BidiAppendableUnbufferedWritableByteChannel(
269-
stream, chunkSegmenter, state.getConfirmedBytes());
270+
stream,
271+
chunkSegmenter,
272+
flushInterval(flushPolicy),
273+
state.getConfirmedBytes());
270274
} else {
271-
c = new BidiAppendableUnbufferedWritableByteChannel(stream, chunkSegmenter, 0);
275+
c =
276+
new BidiAppendableUnbufferedWritableByteChannel(
277+
stream, chunkSegmenter, flushInterval(flushPolicy), 0);
272278
}
273279
return new AppendableObjectBufferedWritableByteChannel(
274280
flushPolicy.createBufferedChannel(c, /* blocking= */ false),
@@ -282,6 +288,15 @@ BlobAppendableUpload create(GrpcStorageImpl storage, BlobInfo info, Opts<ObjectT
282288
build, BidiBlobWriteSessionConfig.Factory.WRITE_OBJECT_RESPONSE_BLOB_INFO_DECODER));
283289
}
284290

291+
private static long flushInterval(FlushPolicy fp) {
292+
if (fp instanceof MinFlushSizeFlushPolicy) {
293+
MinFlushSizeFlushPolicy min = (MinFlushSizeFlushPolicy) fp;
294+
return min.getMinFlushSize();
295+
} else {
296+
return fp.getMaxPendingBytes();
297+
}
298+
}
299+
285300
private static final class AppendableSession
286301
extends ChannelSession<
287302
AppendableUploadState,

google-cloud-storage/src/test/java/com/google/cloud/storage/ITAppendableUploadFakeTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -489,6 +489,7 @@ public void testFlushMultipleSegments_failsHalfway_partialFlush() throws Excepti
489489
3,
490490
storage.storageDataClient.retryContextProvider.create()),
491491
smallSegmenter,
492+
3,
492493
0);
493494
ChecksummedTestContent content = ChecksummedTestContent.of(ALL_OBJECT_BYTES, 0, 10);
494495
StorageChannelUtils.blockingEmptyTo(ByteBuffer.wrap(content.getBytes()), channel);
@@ -638,6 +639,7 @@ public void testFlushMultipleSegmentsTwice_firstSucceeds_secondFailsHalfway_part
638639
3,
639640
storage.storageDataClient.retryContextProvider.create()),
640641
smallSegmenter,
642+
3,
641643
0);
642644
ChecksummedTestContent content1 = ChecksummedTestContent.of(ALL_OBJECT_BYTES, 0, 10);
643645
ChecksummedTestContent content2 = ChecksummedTestContent.of(ALL_OBJECT_BYTES, 10, 10);
@@ -791,7 +793,7 @@ public void testFlushMultipleSegments_200ResponsePartialFlushHalfway() throws Ex
791793
3,
792794
storage.storageDataClient.retryContextProvider.create());
793795
BidiAppendableUnbufferedWritableByteChannel channel =
794-
new BidiAppendableUnbufferedWritableByteChannel(stream, smallSegmenter, 0);
796+
new BidiAppendableUnbufferedWritableByteChannel(stream, smallSegmenter, 3, 0);
795797
ChecksummedTestContent content = ChecksummedTestContent.of(ALL_OBJECT_BYTES, 0, 10);
796798
StorageChannelUtils.blockingEmptyTo(ByteBuffer.wrap(content.getBytes()), channel);
797799
channel.nextWriteShouldFinalize();
@@ -1049,7 +1051,7 @@ private static void runTestFlushMultipleSegments(FakeStorage fake) throws Except
10491051
3,
10501052
storage.storageDataClient.retryContextProvider.create());
10511053
BidiAppendableUnbufferedWritableByteChannel channel =
1052-
new BidiAppendableUnbufferedWritableByteChannel(stream, smallSegmenter, 0);
1054+
new BidiAppendableUnbufferedWritableByteChannel(stream, smallSegmenter, 32, 0);
10531055
StorageChannelUtils.blockingEmptyTo(ByteBuffer.wrap(content.getBytes()), channel);
10541056
channel.nextWriteShouldFinalize();
10551057
channel.close();

0 commit comments

Comments
 (0)