Skip to content

Conversation

@Parkerhiphop
Copy link
Contributor

@Parkerhiphop Parkerhiphop commented Apr 4, 2025

As described in the JIRA ticket, controlPlaneRequestChannelOpt was
removed from KRaft mode, so there's no need to use the metrics prefix
anymore.

This change removes metricNamePrefix from RequestChannel and the
related files.

It also removes DataPlaneAcceptor#MetricPrefix, since
DataPlaneAcceptor is the only implementation of Acceptor.

Since the implementation of KIP-291 is essentially removed, we can also
remove logAndThreadNamePrefix and DataPlaneAcceptor#ThreadPrefix.

Reviewers: PoAn Yang payang@apache.org, Ken Huang
s7133700@gmail.com, Chia-Ping Tsai chia7712@gmail.com

@github-actions github-actions bot added triage PRs from the community core Kafka Broker small Small PRs labels Apr 4, 2025
Copy link
Collaborator

@m1a2st m1a2st left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this patch, left a comment

Comment on lines 351 to 352
private val requestQueueSizeMetricName = RequestQueueSizeMetric
private val responseQueueSizeMetricName = ResponseQueueSizeMetric
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can merge these properties with RequestChannel L47, L48

private val RequestQueueSizeMetric = "RequestQueueSize"
  private val ResponseQueueSizeMetric = "ResponseQueueSize"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@m1a2st Thanks for the comment!
I've directly used RequestQueueSizeMetric and ResponseQueueSizeMetric to eliminate the redundant variable.

Copy link
Member

@FrankYang0529 FrankYang0529 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chia7712 could we also remove DataPlaneAcceptor#ThreadPrefix and DataPlaneAcceptor#MetricPrefix, because the only implementation of Acceptor is DataPlaneAcceptor?

@github-actions github-actions bot removed the triage PRs from the community label Apr 5, 2025
private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE
// data-plane
private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, DataPlaneAcceptor]()
val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneAcceptor.MetricPrefix, time, apiVersionManager.newRequestMetrics)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please try to remove DataPlaneAcceptor.MetricPrefix?

Copy link
Contributor Author

@Parkerhiphop Parkerhiphop Apr 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chia7712 Yes, I've remove the DataPlaneAcceptor#MetricPrefix.

Copy link
Contributor Author

@Parkerhiphop Parkerhiphop Apr 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated: I’m not sure if we should replace the DataPlaneAcceptor#threadPrefix() method with the constant DataPlaneAcceptor#ThreadPrefix, since threadPrefix() is used within the Acceptor interface.

It seems that Acceptor shouldn’t directly rely on constants defined in its child class (DataPlaneAcceptor), as the comment below suggests.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Acceptor has only one sub class now, so we can remove the abstract methods which was used for multi sub-classes.

  1. Acceptor#threadPrefix
  2. DataPlaneAcceptor#ThreadPrefix
  3. s"${threadPrefix()}-kafka-network-thread-$nodeId-${endPoint.listenerName}-${endPoint.securityProtocol}-$id" -> s"data-plane-kafka-network-thread-$nodeId-${endPoint.listenerName}-${endPoint.securityProtocol}-$id""

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chia7712 I got it. Thanks for the explanation.
I’ve removed the threadPrefix method from Acceptor, used the plain string "data-plane" directly in Acceptor, and kept the ThreadPrefix constant in DataPlaneAcceptor so other classes can still reference it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the impl of KIP-291 is basically gone, maybe we can remove logAndThreadNamePrefix too.

Copy link
Contributor Author

@Parkerhiphop Parkerhiphop Apr 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, thanks for providing the context!
I found that logAndThreadNamePrefix was the only place using DataPlaneAcceptor#ThreadPrefix, so I removed both.

@chia7712
Copy link
Member

chia7712 commented Apr 5, 2025

could we also remove DataPlaneAcceptor#ThreadPrefix and DataPlaneAcceptor#MetricPrefix, because the only implementation of Acceptor is DataPlaneAcceptor?

DataPlaneAcceptor#MetricPrefix should be removed. DataPlaneAcceptor#ThreadPrefix has some references, so maybe we should keep it. For another, the method DataPlaneAcceptor#threadPrefix can be replaced by DataPlaneAcceptor#ThreadPrefix


def metricPrefix(): String
def threadPrefix(): String
def threadPrefix(): String = DataPlaneAcceptor.ThreadPrefix
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure it's a good idea for a parent class to use constants from its child classes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I was focused on fixing the previous test case and missed this—thanks for the suggestion!

import kafka.log.LogManager
import kafka.log.remote.RemoteLogManager
import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.network.{SocketServer}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import kafka.network.SocketServer

@chia7712 chia7712 merged commit c8fe551 into apache:trunk Apr 12, 2025
23 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ci-approved core Kafka Broker small Small PRs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants