-
Notifications
You must be signed in to change notification settings - Fork 14.8k
KAFKA-18914 Migrate ConsumerRebootstrapTest to use new test infra #19154
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-18914 Migrate ConsumerRebootstrapTest to use new test infra #19154
Conversation
|
A label of 'needs-attention' was automatically added to this PR in order to raise the |
|
@clarkwtc could you please merge it into ClientRebootstrapTest? |
|
@chia7712 |
|
A label of 'needs-attention' was automatically added to this PR in order to raise the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@clarkwtc thanks for this patch.
| try (var producer = clusterInstance.producer()) { | ||
| var recordMetadata = producer.send(new ProducerRecord<>(TOPIC, part, "key 1".getBytes(), "value 1".getBytes())).get(); | ||
| assertEquals(1, recordMetadata.offset()); | ||
| producer.flush(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't need to call flush after you call the get
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should remove to call get?
I need to call flush; without it, if the broker restarts on rebootstrap, the consumer polls nothing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that acks is not set to 0, the get method guarantees that the record is transmitted to the server. This behavior is equivalent to a flush operation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, set acks=-1 to guarantee that the record will not be lost.
| var tp = new TopicPartition(TOPIC, part); | ||
|
|
||
| try (var producer = clusterInstance.producer()) { | ||
| var recordMetadata = producer.send(new ProducerRecord<>(TOPIC, part, "key 0".getBytes(), "value 0".getBytes())).get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is only one partition, so we can streamline the code:
var recordMetadata = producer.send(new ProducerRecord<>(TOPIC, "value 0".getBytes())).get();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it; make it simple.
| clusterInstance.startBroker(broker0); | ||
|
|
||
| try (var producer = clusterInstance.producer()) { | ||
| var recordMetadata = producer.send(new ProducerRecord<>(TOPIC, part, "key 1".getBytes(), "value 1".getBytes())).get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I fixed it.
| // Only the server 1 is available for the consumer during the bootstrap. | ||
| consumer.assign(partitions); | ||
| consumer.seekToBeginning(partitions); | ||
| assertEquals(1, consumer.poll(Duration.ofSeconds(1)).count()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can't assume the records get returned by only one poll. could you please add loop to avoid flaky?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I use TestUtils.waitForCondition to prevent flaky.
|
|
||
| // The server 1 originally cached during the bootstrap, is offline. | ||
| // However, the server 0 from the bootstrap list is online. | ||
| assertEquals(1, consumer.poll(Duration.ofSeconds(1)).count()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I fixed it.
| var partitions = List.of(new TopicPartition(TOPIC, part)); | ||
|
|
||
| try (var producer = clusterInstance.producer()) { | ||
| var recordMetadata = producer.send(new ProducerRecord<>(TOPIC, part, "key 0".getBytes(), "value 0".getBytes())).get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I fixed it.
- Add ack equal 1 writing the record to local log. - Wait consumer to poll record, that avoid flaky. - Streamline ProducerRecord because only one partition.
…o acknowledge the record.
…ache#19154) Migrate ConsumerRebootstrapTest to the new test infra and remove the old Scala test. The PR changed three things. * Migrated `ConsumerRebootstrapTest` to new test infra and removed the old Scala test. * Updated the original test case to cover rebootstrap scenarios. * Integrated `ConsumerRebootstrapTest` into `ClientRebootstrapTest` in the `client-integration-tests` module. * Removed the `RebootstrapTest.scala`. Default `ConsumerRebootstrap` config: > properties.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG, "rebootstrap"); properties.put(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG, "300000"); properties.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, "10000"); properties.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG, "30000"); properties.put(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG, "50L"); properties.put(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG, "1000L"); The test case for the consumer with enabled rebootstrap  The test case for the consumer with disabled rebootstrap  Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
…ache#19154) Migrate ConsumerRebootstrapTest to the new test infra and remove the old Scala test. The PR changed three things. * Migrated `ConsumerRebootstrapTest` to new test infra and removed the old Scala test. * Updated the original test case to cover rebootstrap scenarios. * Integrated `ConsumerRebootstrapTest` into `ClientRebootstrapTest` in the `client-integration-tests` module. * Removed the `RebootstrapTest.scala`. Default `ConsumerRebootstrap` config: > properties.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG, "rebootstrap"); properties.put(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG, "300000"); properties.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, "10000"); properties.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG, "30000"); properties.put(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG, "50L"); properties.put(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG, "1000L"); The test case for the consumer with enabled rebootstrap  The test case for the consumer with disabled rebootstrap  Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
…ache#19154) Migrate ConsumerRebootstrapTest to the new test infra and remove the old Scala test. The PR changed three things. * Migrated `ConsumerRebootstrapTest` to new test infra and removed the old Scala test. * Updated the original test case to cover rebootstrap scenarios. * Integrated `ConsumerRebootstrapTest` into `ClientRebootstrapTest` in the `client-integration-tests` module. * Removed the `RebootstrapTest.scala`. Default `ConsumerRebootstrap` config: > properties.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG, "rebootstrap"); properties.put(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG, "300000"); properties.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, "10000"); properties.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG, "30000"); properties.put(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG, "50L"); properties.put(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG, "1000L"); The test case for the consumer with enabled rebootstrap  The test case for the consumer with disabled rebootstrap  Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Migrate ConsumerRebootstrapTest to the new test infra and remove the old Scala test.
The PR changed three things.
ConsumerRebootstrapTestto new test infra and removed the old Scala test.ConsumerRebootstrapTestintoClientRebootstrapTestin theclient-integration-testsmodule.RebootstrapTest.scala.Default
ConsumerRebootstrapconfig:The test case for the consumer with enabled rebootstrap

The test case for the consumer with disabled rebootstrap

Reviewers: Chia-Ping Tsai chia7712@gmail.com