Skip to content

Conversation

@clarkwtc
Copy link
Contributor

@clarkwtc clarkwtc commented Mar 11, 2025

Follow-up: #19094

We need to rewrite AdminClientRebootstrapTest to cover the current scenario.
Added the admin client with rebootstrap disabled, as the admin client uses the default AdminClientRebootstrap configuration setting.

Default AdminClientRebootstrap config:

properties.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG, "rebootstrap");
properties.put(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG, "3000000");
properties.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, "10000");
properties.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG, "30000");
properties.put(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG, "50L");
properties.put(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG, "1000L");

The test case for the admin client with enabled rebootstrap
Screenshot 2025-03-11 at 11 57 00 PM

The test case for the admin client with disabled rebootstrap
Screenshot 2025-03-11 at 11 57 38 PM

Reviewers: Jhen-Yung Hsu jhenyunghsu@gmail.com, TengYao Chi kitingiao@gmail.com, Ken Huang s7133700@gmail.com, Chia-Ping Tsai chia7712@gmail.com

@github-actions github-actions bot added triage PRs from the community core Kafka Broker tests Test fixes (including flaky tests) labels Mar 11, 2025
Copy link
Contributor

@frankvicky frankvicky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@clarkwtc : Thanks for the patch.

// Only the server 0 is available for the admin client during the bootstrap.
TestUtils.waitForCondition(() -> admin.listTopics().names().get(timeout, TimeUnit.MINUTES).contains(topic),
// Only the broker 1 is available for the admin client during the bootstrap.
TestUtils.waitForCondition(() -> admin.listTopics().names().get(timeout, TimeUnit.MINUTES).contains(TOPIC),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, why did we have a 5-minute timeout here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your comments.
Maybe it’s a bit too long to wait here.
I updated to 60 seconds.

clusterInstance.startBroker(broker0);

// The broker 1, originally cached during the bootstrap, is offline.
// The admin client will throw a TimeoutException because the brokers are offline during the bootstrap list
Copy link
Collaborator

@Yunyung Yunyung Mar 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code comment is not correct.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your comments.
Sure, this situation refers to the brokers being offline since they were cached during the bootstrap.
I updated the code comments.

@chia7712
Copy link
Member

@clarkwtc cloud you please move this test to clients-integration-tests module?

// The broker 1, originally cached during the bootstrap, is offline.
// The admin client will throw a TimeoutException because the brokers are offline during the bootstrap list
assertThrows(TimeoutException.class, () -> admin.listTopics().names().get(5, TimeUnit.SECONDS));
// The admin client needs to wait the default timeout for other threads because the brokers are offline.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

// The admin client will throw a TimeoutException because the brokers are offline during the bootstrap list
assertThrows(TimeoutException.class, () -> admin.listTopics().names().get(5, TimeUnit.SECONDS));
// The admin client needs to wait the default timeout for other threads because the brokers are offline.
admin.close(Duration.ofSeconds(0));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Duration.ZERO

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've fixed it.

@clarkwtc
Copy link
Contributor Author

@chia7712
I already moved this test to clients-integration-tests.

@chia7712
Copy link
Member

@clarkwtc please check the build error

@clarkwtc
Copy link
Contributor Author

@chia7712
Sorry, I missed it.
I've fixed it.

@github-actions github-actions bot removed the triage PRs from the community label Mar 13, 2025
Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@clarkwtc thanks for your patch.

private static final int PARTITIONS = 2;

@ClusterTest(
brokers = 2,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 -> PARTITIONS

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your comments.
I've fixed it.

}

@ClusterTest(
brokers = 2,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've fixed it.

brokers = 2,
types = {Type.KRAFT},
serverProperties = {
@ClusterConfigProperty(key = TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, value = "true"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need those configs, since this test case does not use either producer or consumer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I wrote the config for ProducerClientRebootstrapTest, but I messed up how to use it.

brokers = 2,
types = {Type.KRAFT},
serverProperties = {
@ClusterConfigProperty(key = TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, value = "true"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've fixed it.


import static org.junit.jupiter.api.Assertions.assertThrows;

public class AdminClientRebootstrapTest {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can rename it to ClientRebootstrapTest and merge producer/consumer into this class

Copy link
Contributor Author

@clarkwtc clarkwtc Mar 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem;
I also changed ClientRebootstrapTest file location from org/apache/kafka/clients/admin to org/apache/kafka/clients and renamed functions.

Copy link
Collaborator

@m1a2st m1a2st left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this patch, a little comment

admin.createTopics(List.of(new NewTopic(TOPIC, PARTITIONS, (short) 2)));

// Only the broker 1 is available for the admin client during the bootstrap.
Assertions.assertDoesNotThrow(() -> admin.listTopics().names().get(timeout, TimeUnit.SECONDS).contains(TOPIC));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: static import

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your comments.
I've fixed it.

- Change Assertions to static import
- Redefine AdminClientRebootstrapTest to ClientRebootstrapTest
- Remove unused config UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG
Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@clarkwtc thanks for this patch.


public class ClientRebootstrapTest {
private static final String TOPIC = "topic";
private static final int PARTITIONS = 2;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the purpose is to ensure each broker has a replica, and hence it should be renamed to "REPLICAS"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've fixed it.

clusterInstance.shutdownBroker(broker0);

try (var admin = clusterInstance.admin()) {
admin.createTopics(List.of(new NewTopic(TOPIC, PARTITIONS, (short) 2)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All we need is "one partition" and "two replicas", right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, we only need one partition across two brokers because the topic will be replicated to both brokers.

@chia7712 chia7712 merged commit e05b0e6 into apache:trunk Mar 15, 2025
22 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ci-approved clients core Kafka Broker tests Test fixes (including flaky tests)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants