3434import java .time .Clock ;
3535import java .util .ArrayList ;
3636import java .util .List ;
37+ import java .util .Random ;
3738import java .util .concurrent .CancellationException ;
3839import java .util .concurrent .ConcurrentLinkedQueue ;
3940import java .util .concurrent .Executors ;
@@ -71,6 +72,7 @@ public class BigtableChannelPool extends ManagedChannel {
7172 private final ChannelPoolHealthChecker channelPoolHealthChecker ;
7273 private final AtomicInteger indexTicker = new AtomicInteger ();
7374 private final String authority ;
75+ private final Random rng = new Random ();
7476
7577 public static BigtableChannelPool create (
7678 BigtableChannelPoolSettings settings ,
@@ -138,15 +140,94 @@ public String authority() {
138140 }
139141
140142 /**
141- * Create a {@link ClientCall} on a Channel from the pool chosen in a round-robin fashion to the
142- * remote operation specified by the given {@link MethodDescriptor}. The returned {@link
143- * ClientCall} does not trigger any remote behavior until {@link
144- * ClientCall#start(ClientCall.Listener, io.grpc.Metadata)} is invoked.
143+ * Create a {@link ClientCall} on a Channel from the pool to the remote operation specified by the
144+ * given {@link MethodDescriptor}. The returned {@link ClientCall} does not trigger any remote
145+ * behavior until {@link ClientCall#start(ClientCall.Listener, io.grpc.Metadata)} is invoked.
145146 */
146147 @ Override
147148 public <ReqT , RespT > ClientCall <ReqT , RespT > newCall (
148149 MethodDescriptor <ReqT , RespT > methodDescriptor , CallOptions callOptions ) {
149- return getChannel (indexTicker .getAndIncrement ()).newCall (methodDescriptor , callOptions );
150+ return new AffinityChannel (pickEntryIndex ()).newCall (methodDescriptor , callOptions );
151+ }
152+
153+ /** Pick an entry to use for the next call. */
154+ private int pickEntryIndex () {
155+ switch (settings .getLoadBalancingStrategy ()) {
156+ case ROUND_ROBIN :
157+ return pickEntryIndexRoundRobin ();
158+ case LEAST_IN_FLIGHT :
159+ return pickEntryIndexLeastInFlight ();
160+ case POWER_OF_TWO_LEAST_IN_FLIGHT :
161+ return pickEntryIndexPowerOfTwoLeastInFlight ();
162+ default :
163+ LOG .warning (String .format (
164+ "Unknown load balancing strategy %s, falling back to ROUND_ROBIN." ,
165+ settings .getLoadBalancingStrategy ()));
166+ return pickEntryIndexRoundRobin ();
167+
168+ }
169+ }
170+
171+ /** Pick an entry using the Round Robin algorithm. */
172+ private int pickEntryIndexRoundRobin () {
173+ return indexTicker .getAndIncrement ();
174+ }
175+
176+ /** Pick an entry at random. */
177+ private int pickEntryIndexRandom () {
178+ List <Entry > localEntries = entries .get ();
179+ for (int attempt = 0 ; attempt < 5 ; attempt ++) {
180+ int choice = rng .nextInt (localEntries .size ());
181+ if (localEntries .get (choice ).retainable ()) {
182+ return choice ;
183+ }
184+ }
185+ LOG .warning ("couldn't find retainable channel, picking at random" );
186+ return rng .nextInt (localEntries .size ());
187+ }
188+
189+ /** Pick an entry using the least-in-flight algorithm. */
190+ private int pickEntryIndexLeastInFlight () {
191+ List <Entry > localEntries = entries .get ();
192+ int minRpcs = Integer .MAX_VALUE ;
193+ List <Integer > candidates = new ArrayList <>();
194+
195+ for (int i = 0 ; i < localEntries .size (); i ++) {
196+ Entry entry = localEntries .get (i );
197+ if (entry .retainable ()) {
198+ int rpcs = entry .outstandingRpcs .get ();
199+ if (rpcs < minRpcs ) {
200+ minRpcs = rpcs ;
201+ candidates .clear ();
202+ candidates .add (i );
203+ } else if (rpcs == minRpcs ) {
204+ candidates .add (i );
205+ }
206+ }
207+ }
208+ if (candidates .isEmpty ()) {
209+ LOG .warning (
210+ "Least-in-flight picker couldn't find available channel. Picking at random." );
211+ return pickEntryIndexRandom ();
212+ }
213+ // If there are multiple matching entries, pick one at random.
214+ return candidates .get (rng .nextInt (candidates .size ()));
215+ }
216+
217+ /** Pick an entry using the power-of-two algorithm. */
218+ private int pickEntryIndexPowerOfTwoLeastInFlight () {
219+ List <Entry > localEntries = entries .get ();
220+ int choice1 = pickEntryIndexRandom ();
221+ int choice2 = pickEntryIndexRandom ();
222+ if (choice1 == choice2 ) {
223+ // Try to pick two different entries. If this picks the same entry again, it's likely that
224+ // there's only one healthy channel in the pool and we should proceed anyway.
225+ choice2 = pickEntryIndexRandom ();
226+ }
227+
228+ Entry entry1 = localEntries .get (choice1 );
229+ Entry entry2 = localEntries .get (choice2 );
230+ return entry1 .outstandingRpcs .get () < entry2 .outstandingRpcs .get () ? choice1 : choice2 ;
150231 }
151232
152233 Channel getChannel (int affinity ) {
@@ -392,16 +473,23 @@ void refresh() {
392473 }
393474
394475 /**
395- * Get and retain a Channel Entry. The returned Entry will have its rpc count incremented,
396- * preventing it from getting recycled.
476+ * Returns one of the channels managed by this pool. The pool continues to "own" the channel, and
477+ * the caller should not shut it down.
478+ *
479+ * @param affinity Two calls to this method with the same affinity returns the same channel most
480+ * of the time, if the channel pool was refreshed since the last call, a new channel will be
481+ * returned. The reverse is not true: Two calls with different affinities might return the
482+ * same channel. However, the implementation should attempt to spread load evenly.
397483 */
398- Entry getRetainedEntry (int affinity ) {
484+ private Entry getEntry (int affinity ) {
399485 // The maximum number of concurrent calls to this method for any given time span is at most 2,
400486 // so the loop can actually be 2 times. But going for 5 times for a safety margin for potential
401487 // code evolving
402488 for (int i = 0 ; i < 5 ; i ++) {
403- Entry entry = getEntry (affinity );
404- if (entry .retain ()) {
489+ List <Entry > localEntries = entries .get ();
490+ int index = Math .abs (affinity % localEntries .size ());
491+ Entry entry = localEntries .get (index );
492+ if (entry .retainable ()) {
405493 return entry ;
406494 }
407495 }
@@ -412,23 +500,6 @@ Entry getRetainedEntry(int affinity) {
412500 throw new IllegalStateException ("Bug: failed to retain a channel" );
413501 }
414502
415- /**
416- * Returns one of the channels managed by this pool. The pool continues to "own" the channel, and
417- * the caller should not shut it down.
418- *
419- * @param affinity Two calls to this method with the same affinity returns the same channel most
420- * of the time, if the channel pool was refreshed since the last call, a new channel will be
421- * returned. The reverse is not true: Two calls with different affinities might return the
422- * same channel. However, the implementation should attempt to spread load evenly.
423- */
424- private Entry getEntry (int affinity ) {
425- List <Entry > localEntries = entries .get ();
426-
427- int index = Math .abs (affinity % localEntries .size ());
428-
429- return localEntries .get (index );
430- }
431-
432503 /** Bundles a gRPC {@link ManagedChannel} with some usage accounting. */
433504 static class Entry {
434505 private final ManagedChannel channel ;
@@ -480,14 +551,23 @@ ManagedChannel getManagedChannel() {
480551 int getAndResetMaxOutstanding () {
481552 return maxOutstanding .getAndSet (outstandingRpcs .get ());
482553 }
554+ /**
555+ * The method will return false if the channel is closing and the caller should pick a different
556+ * channel. If the method returned true, the channel is useable and can be retained.
557+ */
558+ private boolean retainable () {
559+ if (shutdownRequested .get ()) {
560+ requestShutdown ();
561+ return false ;
562+ }
563+ return true ;
564+ }
483565
484566 /**
485- * Try to increment the outstanding RPC count. The method will return false if the channel is
486- * closing and the caller should pick a different channel. If the method returned true, the
487- * channel has been successfully retained and it is the responsibility of the caller to release
488- * it.
567+ * Try to increment the outstanding RPC count. It is the responsibility of the caller to check
568+ * that the channel is retainable, and to release it after RPC completion.
489569 */
490- private boolean retain () {
570+ private void retain () {
491571 // register desire to start RPC
492572 int currentOutstanding = outstandingRpcs .incrementAndGet ();
493573
@@ -496,13 +576,6 @@ private boolean retain() {
496576 if (currentOutstanding > prevMax ) {
497577 maxOutstanding .incrementAndGet ();
498578 }
499-
500- // abort if the channel is closing
501- if (shutdownRequested .get ()) {
502- release ();
503- return false ;
504- }
505- return true ;
506579 }
507580
508581 /**
@@ -517,8 +590,8 @@ private void release() {
517590
518591 // Must check outstandingRpcs after shutdownRequested (in reverse order of retain()) to ensure
519592 // mutual exclusion.
520- if (shutdownRequested .get () && outstandingRpcs . get () == 0 ) {
521- shutdown ();
593+ if (shutdownRequested .get ()) {
594+ requestShutdown ();
522595 }
523596 }
524597
@@ -557,9 +630,13 @@ public String authority() {
557630 @ Override
558631 public <RequestT , ResponseT > ClientCall <RequestT , ResponseT > newCall (
559632 MethodDescriptor <RequestT , ResponseT > methodDescriptor , CallOptions callOptions ) {
560-
561- Entry entry = getRetainedEntry (affinity );
562-
633+ Entry entry = getEntry (affinity );
634+ if (entry .retainable ()) {
635+ entry .retain ();
636+ } else {
637+ LOG .warning ("newCall() called on channel entry that is shutting down. The call may not "
638+ + "succeed." );
639+ }
563640 return new ReleasingClientCall <>(entry .channel .newCall (methodDescriptor , callOptions ), entry );
564641 }
565642 }
0 commit comments