-
Notifications
You must be signed in to change notification settings - Fork 14.8k
KAFKA-19226: Added test_console_share_consumer.py #19708
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. A few minor comments to address.
| .describedAs("consumer_prop") | ||
| .ofType(String.class); | ||
| OptionSpec<String> consumerConfigOpt = parser.accepts("consumer-config", "Consumer config properties file. Note that " + consumerPropertyOpt + " takes precedence over this config.") | ||
| OptionSpec<String> consumerConfigOpt = parser.accepts("consumer.config", "Consumer config properties file. Note that " + consumerPropertyOpt + " takes precedence over this config.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please revert this. This does not match the KIP. (I have a draft KIP to align command options for the various tools.)
| """ | ||
| The console consumer is a tool that reads data from Kafka and outputs it to standard output. | ||
| """ | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am surprised this file has any changes at all.
|
|
||
| from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin | ||
| from kafkatest.services.monitor.jmx import JmxMixin, JmxTool | ||
| from kafkatest.version import DEV_BRANCH, LATEST_3_7, get_version, LATEST_4_0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would have said this should be DEV_BRANCH, get_version, LATEST_4_1. Specifically not 3.7.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review. Actually, none of the imports LATEST_3_7, get_version, LATEST_4_0 are being used. I will remove them all, since the file only uses DEV_BRANCH. The version of DEV_BRANCH is 4.1.0, so that should be fine
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Until DEV_BRANCH moves forwards to 4.2. I think perhaps you'd put DEV_BRANCH, LATEST_4_1 for future-proofing.
| "export KAFKA_OPTS=%(kafka_opts)s; " \ | ||
| "%(console_share_consumer)s " \ | ||
| "--topic %(topic)s " \ | ||
| "--consumer.config %(config_file)s " % args |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consumer-config
| cmd += " --bootstrap-server %(broker_list)s" % args | ||
|
|
||
| if self.share_consumer_timeout_ms is not None: | ||
| # version 0.8.X and below do not support --timeout-ms option |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line of comment is not relevant.
| if self.print_partition: | ||
| cmd += " --property print.partition=true" | ||
|
|
||
| # LoggingMessageFormatter was introduced after 0.9 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also not relevant.
| for line in share_consumer_output: | ||
| msg = line.strip() | ||
| if msg == "shutdown_complete": | ||
| # Note that we can only rely on shutdown_complete message if running 0.10.0 or greater |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not relevant :)
|
@chirag-wadhwa5 Please merge latest changes. |
| wait_until(lambda: self.has_log_message(node, message), | ||
| timeout_sec=60, | ||
| err_msg="Offset not reset for partition %s-%d" % (topic, partition)) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Please revert this changed line too. This PR should not make any changes to unrelated files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review. I was pretty sure I added the end line back. It was probably a mistake, sorry about that.
This PR includes the system test file test_console_share_consumer.py
which tests the functioning of ConsoleShareConsumer
Reviewers: Andrew Schofield aschofield@confluent.io