Skip to content

Conversation

@peterxcli
Copy link
Member

Enable brokers to restart with the same port in ClusterTest.

Tests

In the newly added test testBrokerRestart, if we don't have:

for (TestKitNode node : nodes.brokerNodes().values()) {
    socketFactoryManager.getOrCreatePortForListener(node.id(), brokerListenerName);
}

to preallocate the ports for brokers, it would failed with following logs and error:

...

[2025-01-03 16:38:19,741] INFO [Producer clientId=producer-1] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient:1073)
[2025-01-03 16:38:19,742] WARN [Producer clientId=producer-1] Connection to node -1 (localhost/127.0.0.1:55465) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient:899)
[2025-01-03 16:38:19,742] WARN [Producer clientId=producer-1] Bootstrap broker localhost:55465 (id: -1 rack: null isFenced: false) disconnected (org.apache.kafka.clients.NetworkClient:1256)
[2025-01-03 16:38:20,745] INFO [Producer clientId=producer-1] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient:1073)
[2025-01-03 16:38:20,745] WARN [Producer clientId=producer-1] Connection to node -1 (localhost/127.0.0.1:55465) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient:899)
[2025-01-03 16:38:20,745] WARN [Producer clientId=producer-1] Bootstrap broker localhost:55465 (id: -1 rack: null isFenced: false) disconnected (org.apache.kafka.clients.NetworkClient:1256)
[2025-01-03 16:38:21,667] INFO [Producer clientId=producer-1] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient:1073)
[2025-01-03 16:38:21,667] WARN [Producer clientId=producer-1] Connection to node -1 (localhost/127.0.0.1:55465) could not be established. Node may not be available. (org.apache.kafka.clients.NetworkClient:899)
[2025-01-03 16:38:21,667] WARN [Producer clientId=producer-1] Bootstrap broker localhost:55465 (id: -1 rack: null isFenced: false) disconnected (org.apache.kafka.clients.NetworkClient:1256)
[2025-01-03 16:38:21,667] INFO [Producer clientId=producer-1] Rebootstrapping with [localhost/127.0.0.1:55465] (org.apache.kafka.clients.Metadata:314)

...


testBrokerRestart(org.apache.kafka.common.test.api.ClusterInstance) timed out after 60 seconds

@github-actions github-actions bot added triage PRs from the community tests Test fixes (including flaky tests) small Small PRs labels Jan 3, 2025
Comment on lines 52 to 63

if (socketChannel != null) {
if (socketChannel.isOpen()) {
return socketChannel;
}
// bind the server socket with same port
socketAddress = new InetSocketAddress(socketAddress.getHostString(), socketChannel.socket().getLocalPort());
socketChannel = ServerSocketFactory.INSTANCE.openServerSocket(
listenerName,
socketAddress,
listenBacklogSize,
recvBufferSize);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR depends #18337

@github-actions github-actions bot added connect and removed triage PRs from the community labels Jan 4, 2025
Comment on lines -1434 to -1450
private void useFixedBrokerPort() throws IOException {
// Find a free port and use it in the Kafka broker's listeners config. We can't use port 0 in the listeners
// config to get a random free port because in this test we want to stop the Kafka broker and then bring it
// back up and listening on the same port in order to verify that the Connect cluster can re-connect to Kafka
// and continue functioning normally. If we were to use port 0 here, the Kafka broker would most likely listen
// on a different random free port the second time it is started. Note that we can only use the static port
// because we have a single broker setup in this test.
int listenerPort;
try (ServerSocket s = new ServerSocket(0)) {
listenerPort = s.getLocalPort();
}
brokerProps.put(SocketServerConfigs.LISTENERS_CONFIG, String.format("EXTERNAL://localhost:%d,CONTROLLER://localhost:0", listenerPort));
connectBuilder
.numBrokers(1)
.brokerProps(brokerProps);
}

Copy link
Member Author

@peterxcli peterxcli Jan 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this to pass the following two tests.

Module Test Message Time
ConnectWorkerIntegrationTest testBrokerCoordinator() org.opentest4j.AssertionFailedError: The Kafka cluster used in this test was not able to start successfully in time. If no recent changes have altered the behavior of Kafka brokers or clients, and this error is not occurring frequently, it is probably the result of the testing machine being temporarily overloaded and can be safely ignored. 62.84s
ConnectWorkerIntegrationTest testRequestTimeouts() org.opentest4j.AssertionFailedError: The Kafka cluster used in this test was not able to start successfully in time. If no recent changes have altered the behavior of Kafka brokers or clients, and this error is not occurring frequently, it is probably the result of the testing machine being temporarily overloaded and can be safely ignored. 60.51s

https://github.com/apache/kafka/actions/runs/12595111592?pr=18381

But I don't really know why, investigating...

Copy link
Member Author

@peterxcli peterxcli Jan 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, EmbeddedConnect uses EmbeddedKafkaCluster, and EmbeddedKafkaCluster uses KafkaClusterTestKit to build cluster, and since the brokers has assigned fixed ports in KafkaClusterTestKit, there is no need to modify the LISTENERS_CONFIG for them

Comment on lines -250 to -251
useFixedBrokerPort();

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Comment on lines -816 to -817
useFixedBrokerPort();

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@peterxcli peterxcli force-pushed the k18275-use-same-port-after-broker-restart branch from 84e8137 to df01bb6 Compare January 8, 2025 13:28
@github-actions github-actions bot removed the small Small PRs label Jan 8, 2025
@peterxcli
Copy link
Member Author

peterxcli commented Jan 8, 2025

@github-actions github-actions bot added the small Small PRs label Jan 17, 2025
Comment on lines 210 to 236
Map<Integer, ControllerServer> controllers = new HashMap<>();
Map<Integer, BrokerServer> brokers = new HashMap<>();
Map<Integer, SharedServer> jointServers = new HashMap<>();
File baseDirectory = null;
File jaasFile = null;

private Optional<File> maybeSetupJaasFile() throws Exception {
if (brokerSecurityProtocol.equals(SecurityProtocol.SASL_PLAINTEXT.name)) {
jaasFile = JaasUtils.writeJaasContextsToFile(Set.of(
File file = JaasUtils.writeJaasContextsToFile(Set.of(
new JaasUtils.JaasSection(JaasUtils.KAFKA_SERVER_CONTEXT_NAME,
List.of(
JaasModule.plainLoginModule(
JaasUtils.KAFKA_PLAIN_ADMIN, JaasUtils.KAFKA_PLAIN_ADMIN_PASSWORD,
JaasUtils.KAFKA_PLAIN_ADMIN,
JaasUtils.KAFKA_PLAIN_ADMIN_PASSWORD,
true,
Map.of(
JaasUtils.KAFKA_PLAIN_USER1, JaasUtils.KAFKA_PLAIN_USER1_PASSWORD,
JaasUtils.KAFKA_PLAIN_ADMIN, JaasUtils.KAFKA_PLAIN_ADMIN_PASSWORD)
)
)
)
)
));
JaasUtils.refreshJavaLoginConfigParam(jaasFile);
JaasUtils.refreshJavaLoginConfigParam(file);
return Optional.of(file);
}
return Optional.empty();
}

public KafkaClusterTestKit build() throws Exception {
Map<Integer, ControllerServer> controllers = new HashMap<>();
Map<Integer, BrokerServer> brokers = new HashMap<>();
Map<Integer, SharedServer> jointServers = new HashMap<>();
File baseDirectory = null;
Optional<File> jaasFile = maybeSetupJaasFile();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for resolving the NComplexity error.

Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@peterxcli thanks for this patch!

}
}

@ClusterTest(types = {Type.KRAFT}, brokers = 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we test CO_KRAFT as well?

producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

try (Admin admin = cluster.admin();
Producer<String, String> producer = cluster.producer(Utils.propsToMap(producerProps))) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please use Map.of instead?

@peterxcli peterxcli requested a review from chia7712 January 18, 2025 09:56
@chia7712
Copy link
Member

chia7712 commented Feb 3, 2025

@peterxcli could you please fix the conflicts?

@peterxcli
Copy link
Member Author

@chia7712 Resolved. PTAL, Thanks!

@chia7712 chia7712 merged commit 0621c0b into apache:trunk Feb 6, 2025
8 of 9 checks passed
pdruley pushed a commit to pdruley/kafka that referenced this pull request Feb 12, 2025
manoj-mathivanan pushed a commit to manoj-mathivanan/kafka that referenced this pull request Feb 19, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ci-approved connect small Small PRs tests Test fixes (including flaky tests)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants