Skip to content

Conversation

@cadonna
Copy link
Member

@cadonna cadonna commented Mar 5, 2025

This commit adds the conditions to decide when a Streams group heartbeat should be sent.
A heartbeat should be sent when:

  • the group coordinator is available
  • the member is part of the Streams group or wants to join it
  • the heartbeat interval expired or the member is leaving the group or acknowledging the assginment

This commit does not implement:

  • not sending fields that did not change
  • handling errors

Reviewers: Zheguang Zhao zheguang.zhao@alumni.brown.edu, Lucas Brutschy lbrutschy@confluent.io

@cadonna cadonna added streams consumer KIP-1071 PRs related to KIP-1071 labels Mar 5, 2025
@cadonna cadonna requested review from Copilot and lucasbru March 5, 2025 16:57
@cadonna cadonna requested review from bbejeck and mjsax March 5, 2025 16:57
@cadonna
Copy link
Member Author

cadonna commented Mar 5, 2025

Call for review: @aliehsaeedii

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR Overview

This pull request refactors the heartbeat request management for Kafka Streams by introducing new conditions and timer logic to decide when a heartbeat should be sent. Key changes include:

  • Adding a poll timer and new logic in the heartbeat polling method to handle poll timeouts and skip conditions.
  • Renaming and updating methods in StreamsMembershipManager (from shouldHeartbeatNow to shouldNotWaitForHeartbeatInterval) along with corresponding test updates.
  • Adding detailed JavaDoc comments and new tests to cover various member states and heartbeat scenarios.

Reviewed Changes

File Description
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java Introduces poll timer logic, new conditional branches for sending heartbeats, and improved error propagation via BackgroundEventHandler.
clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java Updates and new tests to validate heartbeat request behavior under different member states and timer conditions.
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java Renames method ensuring consistency in heartbeat interval decision making.
clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java Adjusts tests to match the updated method naming and expected behavior.

Copilot reviewed 4 out of 4 changed files in this pull request and generated no comments.

Comments suppressed due to low confidence (2)

clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java:352

  • [nitpick] Consider adding parentheses in the compound condition within shouldHeartbeatBeforeIntervalExpires() to improve clarity of the evaluation order, e.g. by grouping the JOINING and ACKNOWLEDGING checks together before applying the requestInFlight() negation.
private boolean shouldHeartbeatBeforeIntervalExpires() {

clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java:650

  • The heartbeat interval value is cast from long to int; ensure that the interval value never exceeds the limits of an int to prevent potential overflow issues in production.
.setHeartbeatIntervalMs((int) RECEIVED_HEARTBEAT_INTERVAL_MS)

return membershipManager.state() == MemberState.LEAVING
||
(membershipManager.state() == MemberState.JOINING || membershipManager.state() == MemberState.ACKNOWLEDGING)
&& !heartbeatRequestState.requestInFlight();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 wondering if the closing parenthesis is off by one clause? By reading the comment above, it seems this intended instead:

||
            (membershipManager.state() == MemberState.JOINING || membershipManager.state() == MemberState.ACKNOWLEDGING
                && !heartbeatRequestState.requestInFlight());```

Copy link
Member Author

@cadonna cadonna Mar 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zheguang Thanks for your comment!

The comment says:

the member is joining the group or acknowledging the assignment and for both cases there is no heartbeat request in flight.

That means, the expression should be true for

membershipManager.state() == MemberState.JOINING && !heartbeatRequestState.requestInFlight()

OR

membershipManager.state() == MemberState.ACKNOWLEDGING && !heartbeatRequestState.requestInFlight()

The distributive laws of boolean algrebra say:

(B⋅A)+(C⋅A) = (B+C)⋅A 

If we define A to be !heartbeatRequestState.requestInFlight(), B to be membershipManager.state() == MemberState.JOINING && !heartbeatRequestState.requestInFlight(), and C to be membershipManager.state() == MemberState.ACKNOWLEDGING && !heartbeatRequestState.requestInFlight(), we can rewrite the above to

(membershipManager.state() == MemberState.JOINING || membershipManager.state() == MemberState.ACKNOWLEDGING) 
&& !heartbeatRequestState.requestInFlight()

If we applied your suggestion, the expression would be true if

membershipManager.state() == MemberState.JOINING

OR

membershipManager.state() == MemberState.ACKNOWLEDGING && !heartbeatRequestState.requestInFlight()

So, the member state needs only to be MemberState.JOINING to send a heartbeat. It would not check for a heartbeat in flight.

Do you agree?

Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

THanks for the PR, @cadonna ! Here are my comments


/**
* This will build a heartbeat request if one must be sent, determined based on the member
* state. A heartbeat is sent in the following situations:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are not separate situations in which a heartbeat must be sent. Maybe you mean "when all of the following conditions apply" or soemthign?

* or
* - the member is joining the group or acknowledging the assignment and for both cases there is no heartbeat request
* in flight.
* @return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something seems to be missing after @return.


private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final long currentTimeMs) {
NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest(
return makeHeartbeatRequest(currentTimeMs, this::handleResponse);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, this addCompletionCallback is complex, and also seems quite unncessary. Can I not just do a handleResponse(makeHeartbeatRequest(currentTimeMs)) or -even better in my eyes - implement a method for the lambda inside of handleResponse only, and then do a makeHeartbeatRequest(currentTimeMs).whenComplete(handleResponse).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice idea! Thanks!
Let me know how you like it now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, that's better!

cadonna added 2 commits March 7, 2025 14:05
This commit adds the conditions to decide when a Streams group
heartbeat should be sent.
A heartbeat should be sent when:
- the group coordinator is available
- the member is part of the Streams group or wants to join it
- the heartbeat interval expired or
  the member is leaving the group or acknowledging the assginment

This commit does not implement:
- not sending fields that did not change
- handling errors
@cadonna cadonna force-pushed the streams_group_heartbeat_request_manager-3 branch from c8a0702 to 312732b Compare March 7, 2025 13:09
Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks!

@cadonna cadonna merged commit 59e5890 into apache:trunk Mar 10, 2025
25 checks passed
@cadonna cadonna deleted the streams_group_heartbeat_request_manager-3 branch March 24, 2025 20:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants