Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
Expand All @@ -28,6 +30,7 @@
import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;

import org.slf4j.Logger;

Expand All @@ -43,6 +46,17 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/**
* <p>Manages the request creation and response handling for the Streams group heartbeat. The class creates a
* heartbeat request using the state stored in the membership manager. The requests can be retrieved
* by calling {@link StreamsGroupHeartbeatRequestManager#poll(long)}. Once the response is received, it updates the
* state in the membership manager and handles any errors.
*
* <p>The heartbeat manager generates heartbeat requests based on the member state. It's also responsible
* for the timing of the heartbeat requests to ensure they are sent according to the heartbeat interval
* (while the member state is stable) or on demand (while the member is acknowledging an assignment or
* leaving the group).
*/
public class StreamsGroupHeartbeatRequestManager implements RequestManager {

static class HeartbeatState {
Expand All @@ -59,6 +73,9 @@ public HeartbeatState(final StreamsRebalanceData streamsRebalanceData,
this.rebalanceTimeoutMs = rebalanceTimeoutMs;
}

public void reset() {
}

public StreamsGroupHeartbeatRequestData buildRequestData() {
StreamsGroupHeartbeatRequestData data = new StreamsGroupHeartbeatRequestData();
data.setGroupId(membershipManager.groupId());
Expand Down Expand Up @@ -205,7 +222,6 @@ private static List<StreamsGroupHeartbeatRequestData.TopicInfo> getChangelogTopi
}
}


private final Logger logger;

private final int maxPollIntervalMs;
Expand All @@ -218,15 +234,24 @@ private static List<StreamsGroupHeartbeatRequestData.TopicInfo> getChangelogTopi

private final StreamsMembershipManager membershipManager;

private final BackgroundEventHandler backgroundEventHandler;

private final HeartbeatMetricsManager metricsManager;

private StreamsRebalanceData streamsRebalanceData;

/**
* Timer for tracking the time since the last consumer poll. If the timer expires, the consumer will stop
* sending heartbeat until the next poll.
*/
private final Timer pollTimer;

public StreamsGroupHeartbeatRequestManager(final LogContext logContext,
final Time time,
final ConsumerConfig config,
final CoordinatorRequestManager coordinatorRequestManager,
final StreamsMembershipManager membershipManager,
final BackgroundEventHandler backgroundEventHandler,
final Metrics metrics,
final StreamsRebalanceData streamsRebalanceData) {
this.logger = logContext.logger(getClass());
Expand All @@ -238,6 +263,10 @@ public StreamsGroupHeartbeatRequestManager(final LogContext logContext,
membershipManager,
"Streams membership manager cannot be null"
);
this.backgroundEventHandler = Objects.requireNonNull(
backgroundEventHandler,
"Background event handler cannot be null"
);
this.metricsManager = new HeartbeatMetricsManager(
Objects.requireNonNull(metrics, "Metrics cannot be null")
);
Expand All @@ -254,31 +283,119 @@ public StreamsGroupHeartbeatRequestManager(final LogContext logContext,
retryBackoffMaxMs,
maxPollIntervalMs
);
this.pollTimer = time.timer(maxPollIntervalMs);
}

/**
* This will build a heartbeat request if one must be sent, determined based on the member
* state. A heartbeat is sent when all of the following applies:
* <ol>
* <li>Member is part of the consumer group or wants to join it.</li>
* <li>The heartbeat interval has expired, or the member is in a state that indicates
* that it should heartbeat without waiting for the interval.</li>
* </ol>
* This will also determine the maximum wait time until the next poll based on the member's
* state.
* <ol>
* <li>If the member is without a coordinator or is in a failed state, the timer is set
* to Long.MAX_VALUE, as there's no need to send a heartbeat.</li>
* <li>If the member cannot send a heartbeat due to either exponential backoff, it will
* return the remaining time left on the backoff timer.</li>
* <li>If the member's heartbeat timer has not expired, It will return the remaining time
* left on the heartbeat timer.</li>
* <li>If the member can send a heartbeat, the timer is set to the current heartbeat interval.</li>
* </ol>
*
* @return {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} that includes a
* heartbeat request if one must be sent, and the time to wait until the next poll.
*/
@Override
public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
return new NetworkClientDelegate.PollResult(
heartbeatRequestState.heartbeatIntervalMs(),
Collections.singletonList(makeHeartbeatRequest(currentTimeMs))
);
if (coordinatorRequestManager.coordinator().isEmpty() || membershipManager.shouldSkipHeartbeat()) {
membershipManager.onHeartbeatRequestSkipped();
maybePropagateCoordinatorFatalErrorEvent();
return NetworkClientDelegate.PollResult.EMPTY;
}
pollTimer.update(currentTimeMs);
if (pollTimer.isExpired() && !membershipManager.isLeavingGroup()) {
logger.warn("Consumer poll timeout has expired. This means the time between " +
"subsequent calls to poll() was longer than the configured max.poll.interval.ms, " +
"which typically implies that the poll loop is spending too much time processing " +
"messages. You can address this either by increasing max.poll.interval.ms or by " +
"reducing the maximum size of batches returned in poll() with max.poll.records.");

membershipManager.onPollTimerExpired();
NetworkClientDelegate.UnsentRequest leaveHeartbeat = makeHeartbeatRequestAndLogResponse(currentTimeMs);

// We can ignore the leave response because we can join before or after receiving the response.
heartbeatRequestState.reset();
heartbeatState.reset();
return new NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs(), Collections.singletonList(leaveHeartbeat));
}
if (shouldHeartbeatBeforeIntervalExpires() || heartbeatRequestState.canSendRequest(currentTimeMs)) {
NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequestAndHandleResponse(currentTimeMs);
return new NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs(), Collections.singletonList(request));
} else {
return new NetworkClientDelegate.PollResult(heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs));
}
}

private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final long currentTimeMs) {
NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest(
new StreamsGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()),
coordinatorRequestManager.coordinator()
);
request.whenComplete((response, exception) -> {
/**
* A heartbeat should be sent without waiting for the heartbeat interval to expire if:
* - the member is leaving the group
* or
* - the member is joining the group or acknowledging the assignment and for both cases there is no heartbeat request
* in flight.
*
* @return true if a heartbeat should be sent before the interval expires, false otherwise
*/
private boolean shouldHeartbeatBeforeIntervalExpires() {
return membershipManager.state() == MemberState.LEAVING
||
(membershipManager.state() == MemberState.JOINING || membershipManager.state() == MemberState.ACKNOWLEDGING)
&& !heartbeatRequestState.requestInFlight();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 wondering if the closing parenthesis is off by one clause? By reading the comment above, it seems this intended instead:

||
            (membershipManager.state() == MemberState.JOINING || membershipManager.state() == MemberState.ACKNOWLEDGING
                && !heartbeatRequestState.requestInFlight());```

Copy link
Member Author

@cadonna cadonna Mar 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zheguang Thanks for your comment!

The comment says:

the member is joining the group or acknowledging the assignment and for both cases there is no heartbeat request in flight.

That means, the expression should be true for

membershipManager.state() == MemberState.JOINING && !heartbeatRequestState.requestInFlight()

OR

membershipManager.state() == MemberState.ACKNOWLEDGING && !heartbeatRequestState.requestInFlight()

The distributive laws of boolean algrebra say:

(B⋅A)+(C⋅A) = (B+C)⋅A 

If we define A to be !heartbeatRequestState.requestInFlight(), B to be membershipManager.state() == MemberState.JOINING && !heartbeatRequestState.requestInFlight(), and C to be membershipManager.state() == MemberState.ACKNOWLEDGING && !heartbeatRequestState.requestInFlight(), we can rewrite the above to

(membershipManager.state() == MemberState.JOINING || membershipManager.state() == MemberState.ACKNOWLEDGING) 
&& !heartbeatRequestState.requestInFlight()

If we applied your suggestion, the expression would be true if

membershipManager.state() == MemberState.JOINING

OR

membershipManager.state() == MemberState.ACKNOWLEDGING && !heartbeatRequestState.requestInFlight()

So, the member state needs only to be MemberState.JOINING to send a heartbeat. It would not check for a heartbeat in flight.

Do you agree?

}

private void maybePropagateCoordinatorFatalErrorEvent() {
coordinatorRequestManager.getAndClearFatalError()
.ifPresent(fatalError -> backgroundEventHandler.add(new ErrorEvent(fatalError)));
}

private NetworkClientDelegate.UnsentRequest makeHeartbeatRequestAndLogResponse(final long currentTimeMs) {
return makeHeartbeatRequest(currentTimeMs).whenComplete((response, exception) -> {
if (response != null) {
metricsManager.recordRequestLatency(response.requestLatencyMs());
Errors error = Errors.forCode(((StreamsGroupHeartbeatResponse) response.responseBody()).data().errorCode());
if (error == Errors.NONE)
logger.debug("StreamsGroupHeartbeatRequest responded successfully: {}", response);
else
logger.error("StreamsGroupHeartbeatRequest failed because of {}: {}", error, response);
} else {
logger.error("StreamsGroupHeartbeatRequest failed because of unexpected exception.", exception);
}
});
}

private NetworkClientDelegate.UnsentRequest makeHeartbeatRequestAndHandleResponse(final long currentTimeMs) {
NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(currentTimeMs);
return request.whenComplete((response, exception) -> {
long completionTimeMs = request.handler().completionTimeMs();
if (response != null) {
metricsManager.recordRequestLatency(response.requestLatencyMs());
onResponse((StreamsGroupHeartbeatResponse) response.responseBody(), completionTimeMs);
}
});
}

private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final long currentTimeMs) {
NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest(
new StreamsGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()),
coordinatorRequestManager.coordinator()
);
heartbeatRequestState.onSendAttempt(currentTimeMs);
membershipManager.onHeartbeatRequestGenerated();
metricsManager.recordHeartbeatSentMs(currentTimeMs);
heartbeatRequestState.resetTimer();
return request;
}

Expand All @@ -290,17 +407,14 @@ private void onResponse(final StreamsGroupHeartbeatResponse response, long curre

private void onSuccessResponse(final StreamsGroupHeartbeatResponse response, final long currentTimeMs) {
final StreamsGroupHeartbeatResponseData data = response.data();

heartbeatRequestState.updateHeartbeatIntervalMs(data.heartbeatIntervalMs());
heartbeatRequestState.onSuccessfulAttempt(currentTimeMs);
heartbeatRequestState.resetTimer();

if (data.partitionsByUserEndpoint() != null) {
streamsRebalanceData.setPartitionsByHost(convertHostInfoMap(data));
}

List<StreamsGroupHeartbeatResponseData.Status> statuses = data.status();

if (statuses != null && !statuses.isEmpty()) {
String statusDetails = statuses.stream()
.map(status -> "(" + status.statusCode() + ") " + status.statusDetail())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ public boolean shouldSkipHeartbeat() {
* @return True if the member should send heartbeat to the coordinator without waiting for
* the interval.
*/
public boolean shouldHeartbeatNow() {
public boolean shouldNotWaitForHeartbeatInterval() {
return state == MemberState.ACKNOWLEDGING || state == MemberState.LEAVING || state == MemberState.JOINING;
}

Expand Down
Loading