-
Notifications
You must be signed in to change notification settings - Fork 14.8k
KAFKA-17431: Support invalid static configs for KRaft so long as dynamic configs are valid #18949
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…amic configs are valid
|
A label of 'needs-attention' was automatically added to this PR in order to raise the |
|
|
||
| def voterNode(id: Int, listener: ListenerName): Option[Node] | ||
|
|
||
| def getRecordSerde: RecordSerde[T] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Kafka we generally don't put 'get' in front of getters. So this method should just be recordSerde
|
|
||
| val clientMetricsReceiverPlugin = new ClientMetricsReceiverPlugin() | ||
| config.dynamicConfig.initialize(Some(clientMetricsReceiverPlugin)) | ||
| DynamicBrokerConfig.readDynamicBrokerConfigsFromSnapshot(raftManager, config, quotaManagers) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be too late to address some of the use-cases we're concerned about, right?
It would be better to load all the config key/value pairs from the snapshot and overwrite the static configs with them, before the first time we create a KafkaConfig object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be too late to address some of the use-cases we're concerned about, right?
Could you illuminate some of these cases for me? It seems like we're doing this early enough in brokerServer's startup to me. Are we instead concerned that bad static configs will prevent even SharedServer#startup, which sets up KRaft and metadata publishers, from completing successfully? I guess I'm just confused as to how node can even get into a state where the static configs would crash SharedServer#startup, but somehow also have valid dynamic configs?
I'm a bit confused as to what you're proposing. It sounds like this loading config key/value pairs from the snapshot should occur before we construct the KafkaConfig in SharedServer's initialization (i.e. before we call SharedServer#start which initializes raftManager)? If so, that means we can't use raftManager to actually perform the snapshot read, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you illuminate some of these cases for me?
One use case is if we have changed the network configuration in such a way that the old static configuration isn't valid. The SSL keystore file being moved is a good example. In that case we would not want to start up with the old static configuration
If so, that means we can't use raftManager to actually perform the snapshot read, right?
It shouldn't be necessary to use raftManager to read the snapshot, since the snapshot is just a file. Maybe we could have some static utility method which finds the last snapshot file (it's just a matter of finding the one that sorts last in the folder...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test I wrote does "move" the SSL keystore file. By invalidating the static ssl configs on one of the nodes after shutting it down, we can successfully complete brokerServer#startup() with my changes. Without them, we fail during startup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want full parity with how ZK worked, we need to load the dynamic configurations prior to sending out the initial dynamic configurations. In ZK mode we actually fetched the broker configuration from ZK prior to doing this. It's possible we could do this in a different way but I'm not confident that it will solve all the possible cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to load the dynamic configurations prior to sending out the initial dynamic configurations
I'm a bit confused about what this means. Where in sharedServer do we "send out initial dynamic configurations"?
From a high level, I think the current implementation is doing the equivalent in KRaft, since the KRaft layer, which reads the snapshot, on the broker is acting like ZK (although possibly stale) to store broker configs (correct me if I'm wrong, I also will look more into this tmrw).
The only config I see that is used in sharedServer/raftManager that is also part of DynamicBrokerConfig#AllDynamicConfigs is the LISTENER_SECURITY_PROTOCOL_MAP_CONFIG. This config is read when building the KRaft network client for the broker during startup, but only the entry for the controller listener is used. From my understanding, the only way to invalidate this static config if it was previously valid would be to mess with server.properties directly and then restart the broker. Is that also a case we also want to cover with this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit confused about what this means. Where in sharedServer do we "send out initial dynamic configurations"?
SharedServer doesn't. This happens from BrokerServer or ControllerServer.
From a high level, I think the current implementation is doing the equivalent in KRaft, since the KRaft layer, which reads the snapshot, on the broker is acting like ZK (although possibly stale) to store broker configs (correct me if I'm wrong, I also will look more into this tmrw).
If you are confident that this will handle the case that originally motivated this JIRA then I'm OK with doing this for now. As you recall, the case was the broker SSL keystore file being statically set to a path that no longer existed.
The only config I see that is used in sharedServer/raftManager that is also part of DynamicBrokerConfig#AllDynamicConfigs is the LISTENER_SECURITY_PROTOCOL_MAP_CONFIG. This config is read when building the KRaft network client for the broker during startup, but only the entry for the controller listener is used. From my understanding, the only way to invalidate this static config if it was previously valid would be to mess with server.properties directly and then restart the broker. Is that also a case we also want to cover with this change?
No. That configuration is static and cannot be dynamically changed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you are confident that this will handle the case that originally motivated this JIRA then I'm OK with doing this for now. As you recall, the case was the broker SSL keystore file being statically set to a path that no longer existed.
The test I wrote shows that the implementation changes address this case, since it does the following:
- updating the dynamic configs with the current valid static configs, which includes the keystore file
- making sure the broker's most recent snapshot has that config update
- shutdown the broker
- setting the static keystore location config to an invalid file path
- verify we can start the broker again (with my change this test passes, and without it the test throws a NoSuchFileException for the invalid file path)
| batch.forEach(record => { | ||
| if (record.message().apiKey() == MetadataRecordType.CONFIG_RECORD.id) { | ||
| val configRecord = record.message().asInstanceOf[ConfigRecord] | ||
| if (DynamicBrokerConfig.AllDynamicConfigs.contains(configRecord.name())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not quite right, because you aren't distinguishing between broker configurations and other kinds of configuration. You're also assuming that if you find a broker configuration, it applies to this node, which may not be the case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not quite right, because you aren't distinguishing between broker configurations and other kinds of configuration.
Yeah I looked at DynamicConfigPublisher. I think when I get the ConfigRecord, I need to check that its resourceType is for BROKER and only put those configs in dynamicBrokerConfigs. Then whether or not we update cluster defaults/per-broker is based on if resourceName is empty or contains the broker's id.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is correct. For pedantic correctness, you should also handle the case where we're setting it to null (by deleting it) although I don't expect that to occur in a snapshot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The approach discussed in my previous comment covers this case, since we're not checking the value() field of the ConfigRecord to determine whether or not we put the key value pair into the props passed into processConfigChanges. I assume processConfigChanges handles this null value case already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume processConfigChanges handles this null value case already.
Sorry, but this would be an incorrect assumption! We don't seem to have special handling for null values in the code that translates the dynamicBrokerConfigs map into a Properties object.
Let's just handle this properly by removing the key/value pair from that map when a null value shows up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…, since the latter depends on the former when a snapshot exists
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
| }) | ||
| } | ||
| val configHandler = new BrokerConfigHandler(config, quotaManagers) | ||
| configHandler.processConfigChanges("", dynamicPerBrokerConfigs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dynamicPerBrokerConfigs should be replaced by dynamicDefaultConfigs. I have opened https://issues.apache.org/jira/browse/KAFKA-19642 to fix it
What:
Testing:
Committer Checklist (excluded from commit message)