Skip to content

Conversation

@lucasbru
Copy link
Member

@lucasbru lucasbru commented Jan 28, 2025

Implements a memory model for representing streams groups in the group coordinator, as well as group count and rebalance metrics.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@lucasbru lucasbru force-pushed the kip1071merge/streams_group branch from b9894e7 to ad4f4f3 Compare February 3, 2025 13:19
@lucasbru
Copy link
Member Author

lucasbru commented Feb 3, 2025

Rebased on latest target assignment builder

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot reviewed 8 out of 16 changed files in this pull request and generated 1 comment.

Files not reviewed (8)
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java: Evaluated as low risk
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java: Evaluated as low risk
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java: Evaluated as low risk
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/MockAssignor.java: Evaluated as low risk
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopology.java: Evaluated as low risk
  • group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/MockAssignorTest.java: Evaluated as low risk
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java: Evaluated as low risk
  • group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java: Evaluated as low risk
Comments suppressed due to low confidence (1)

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopologyMetadataTest.java:126

  • The method name testNumTasksThrowsExceptionWhenTopologyNotConfigured is inconsistent with the actual method being tested (maxNumInputPartitions). It should be renamed to testMaxNumInputPartitionsThrowsExceptionWhenTopologyNotConfigured.
void testNumTasksThrowsExceptionWhenTopologyNotConfigured() {

@lucasbru lucasbru force-pushed the kip1071merge/streams_group branch from ad4f4f3 to ab70816 Compare February 3, 2025 14:30
@cadonna cadonna added streams KIP-1071 PRs related to KIP-1071 core Kafka Broker labels Feb 3, 2025
@lucasbru lucasbru force-pushed the kip1071merge/streams_group branch from ab70816 to 8866781 Compare February 4, 2025 08:58
@lucasbru
Copy link
Member Author

lucasbru commented Feb 4, 2025

Rebased on latest trunk

Copy link
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, @lucasbru !

I did a first pass over the production code.

Comment on lines 169 to 171
private final TimelineHashMap<String, TimelineHashMap<Integer, String>> invertedTargetActiveTasksAssignment;
private final TimelineHashMap<String, TimelineHashMap<Integer, String>> invertedTargetStandbyTasksAssignment;
private final TimelineHashMap<String, TimelineHashMap<Integer, String>> invertedTargetWarmupTasksAssignment;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add some more info to the javadoc? This data structure is a bit hard to grasp.
A visualization like the following would help:

subtopology -> partition -> memberId

I am wondering if using a class like TaskId for the key would make the code simpler.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of renaming this data structures to currrentXTaskToMemberId?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this is not used at all right now. In the consumer group, this is used to optimize the assignor, but we don't use it in our assignors. I think I will remove it for now, we can bring it back once we optimize the assignors.

Comment on lines 177 to 179
private final TimelineHashMap<String, TimelineHashMap<Integer, String>> currentActiveTaskProcessId;
private final TimelineHashMap<String, TimelineHashMap<Integer, Set<String>>> currentStandbyTaskProcessIds;
private final TimelineHashMap<String, TimelineHashMap<Integer, Set<String>>> currentWarmupTaskProcessIds;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above comment also applies to this data structure.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of renaming this data structures to currrentXTaskToProcessId?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with Bruno about a slight renaming for clarity.

}

/**
* @return An immutable Map containing all the target assignment keyed by member id.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @return An immutable Map containing all the target assignment keyed by member id.
* @return An immutable map containing all the target assignment keyed by member ID.

Here and elsewhere

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

/**
* Removes the target assignment of a member.
*
* @param memberId The member id.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @param memberId The member id.
* @param memberId The member ID.

Here and in other java docs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

/**
* Returns the current processId of a task or null if the task does not have one.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Returns the current processId of a task or null if the task does not have one.
* Returns the current process ID of a task or null if the task does not have one.

Here and in other places.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines +646 to +490
if (tasks == null) {
return null;
} else {
return tasks.getOrDefault(taskId, null);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to change this to Optional?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this may have to checked in tight loops inside the assignor. I think it's a good place to avoid creating extra objects just for returning a value.

Copy link
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a pass over the test code.

}

@Test
public void testAsListedGroup() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not understand this test. The test name is testAsListedGroup() but the method asListedGroup() is never called.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right. This is what is being tested in the other groups as well, interestingly. I think what is being tested here is covered by other tests already. I replaced it by a test that makes sense to me.

.setGroupType(type().toString());
}

public ConfiguredTopology configuredTopology() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to return the Optional here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's try it.

return configuredTopology.get().orElse(null);
}

public StreamsTopology topology() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to return the Optional here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's try it

return topology.get().orElse(null);
}

public void setTopology(StreamsTopology topology) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add a test that verifies that the configured topology is updated?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* @param groupInstanceId The group instance id.
* @return The member id corresponding to the given instance id or null if it does not exist
*/
public String staticMemberId(String groupInstanceId) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe also here we might want to return an Optional. However, we can postpone this decision to when we will use this method in the handlers.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, let's postpone it.

}

@Test
public void testUpdateInvertedAssignment() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please also add a case where the subtopology ID is modified?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the interverted assignment because we are not using it (yet)

*
* @param partitionMetadata The new partition metadata.
*/
public void setPartitionMetadata(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add unit test for this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* @param clusterImage The current metadata for the Kafka cluster.
* @return An immutable map of partition metadata for each topic that the streams topology is using (besides non-repartition sink topics)
*/
public Map<String, TopicMetadata> computePartitionMetadata(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume that this will be used in the future. Could you please add unit tests?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I also realized that the rack information that we included here for future proofing (but not rack-aware assignment is currently done / planned) was removed from consumer groups due to the memory pressure it caused. I also updated this to not include the rack information. We can always bring it back in the future. This required some additional changes (but only removals).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The part of the GroupMetadataManager that calls this method is a bit weird. The group computes and returns the partition metadata so that the computed partition metadata can be passed back into the group. We should consider simplifying that part when we get to the GroupMetadataManager.

Copy link
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @lucasbru - I don't have any additional comments beyond what Bruno has, overall this looks good to me modulo addressing those comments.

Comment on lines 177 to 179
private final TimelineHashMap<String, TimelineHashMap<Integer, String>> currentActiveTaskProcessId;
private final TimelineHashMap<String, TimelineHashMap<Integer, Set<String>>> currentStandbyTaskProcessIds;
private final TimelineHashMap<String, TimelineHashMap<Integer, Set<String>>> currentWarmupTaskProcessIds;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with Bruno about a slight renaming for clarity.

/**
* @return The current state based on committed offset.
*/
public StreamsGroupState state(long committedOffset) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When is this method called? How often is the offset updated? From the parameter name, I'm inferring that it occurs after every commit

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not really called. We always get the latest state, although we can get the state at every committed offset of the consumer offset topic. I removed it, we can always bring it back

Copy link
Member Author

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the reviews! Addressed all comments. Ready for re-review @cadonna @bbejeck

Comment on lines 169 to 171
private final TimelineHashMap<String, TimelineHashMap<Integer, String>> invertedTargetActiveTasksAssignment;
private final TimelineHashMap<String, TimelineHashMap<Integer, String>> invertedTargetStandbyTasksAssignment;
private final TimelineHashMap<String, TimelineHashMap<Integer, String>> invertedTargetWarmupTasksAssignment;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this is not used at all right now. In the consumer group, this is used to optimize the assignor, but we don't use it in our assignors. I think I will remove it for now, we can bring it back once we optimize the assignors.

Comment on lines 177 to 179
private final TimelineHashMap<String, TimelineHashMap<Integer, String>> currentActiveTaskProcessId;
private final TimelineHashMap<String, TimelineHashMap<Integer, Set<String>>> currentStandbyTaskProcessIds;
private final TimelineHashMap<String, TimelineHashMap<Integer, Set<String>>> currentWarmupTaskProcessIds;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

.setGroupType(type().toString());
}

public ConfiguredTopology configuredTopology() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's try it.

return configuredTopology.get().orElse(null);
}

public StreamsTopology topology() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's try it

/**
* Removes the target assignment of a member.
*
* @param memberId The member id.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* @param groupInstanceId The group instance id.
* @return The member id corresponding to the given instance id or null if it does not exist
*/
public String staticMemberId(String groupInstanceId) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, let's postpone it.

*
* @param partitionMetadata The new partition metadata.
*/
public void setPartitionMetadata(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* @param clusterImage The current metadata for the Kafka cluster.
* @return An immutable map of partition metadata for each topic that the streams topology is using (besides non-repartition sink topics)
*/
public Map<String, TopicMetadata> computePartitionMetadata(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I also realized that the rack information that we included here for future proofing (but not rack-aware assignment is currently done / planned) was removed from consumer groups due to the memory pressure it caused. I also updated this to not include the rack information. We can always bring it back in the future. This required some additional changes (but only removals).

@lucasbru lucasbru force-pushed the kip1071merge/streams_group branch from 77921b0 to 98bcef4 Compare February 7, 2025 16:30
Copy link
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates, @lucasbru !

Here my comments.

* @param clusterImage The current metadata for the Kafka cluster.
* @return An immutable map of partition metadata for each topic that the streams topology is using (besides non-repartition sink topics)
*/
public Map<String, TopicMetadata> computePartitionMetadata(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The part of the GroupMetadataManager that calls this method is a bit weird. The group computes and returns the partition metadata so that the computed partition metadata can be passed back into the group. We should consider simplifying that part when we get to the GroupMetadataManager.

Copy link
Member Author

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments addressed, ready for re-review @cadonna

Copy link
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates, @lucasbru !

I had one minor comment.

LGTM!

@lucasbru lucasbru merged commit c70b7c4 into apache:trunk Feb 12, 2025
7 of 9 checks passed
pdruley pushed a commit to pdruley/kafka that referenced this pull request Feb 12, 2025
Implements a memory model for representing streams groups in the group coordinator, as well as group count and rebalance metrics.

Reviewers: Bill Bejeck <bill@confluent.io>, Bruno Cadonna <bruno@confluent.io>
manoj-mathivanan pushed a commit to manoj-mathivanan/kafka that referenced this pull request Feb 19, 2025
Implements a memory model for representing streams groups in the group coordinator, as well as group count and rebalance metrics.

Reviewers: Bill Bejeck <bill@confluent.io>, Bruno Cadonna <bruno@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Kafka Broker KIP-1071 PRs related to KIP-1071 streams

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants