KAFKA-20426: Using both group.id and assign() causes a busy loop in AsyncKafkaConsumer#22018
KAFKA-20426: Using both group.id and assign() causes a busy loop in AsyncKafkaConsumer#22018brandboat wants to merge 1 commit intoapache:trunkfrom
Conversation
…syncKafkaConsumer
| * <p>Similarly, we may have to unblock the application thread to send a {@link AsyncPollEvent} to make sure | ||
| * our poll timer will not expire while we are polling. | ||
| * | ||
| * <p>In the event that heartbeats are currently being skipped, this still returns the next heartbeat | ||
| * delay rather than {@code Long.MAX_VALUE} so that the application thread remains responsive. | ||
| * <p>In the event that heartbeats are currently being skipped (e.g., the member is in | ||
| * {@link MemberState#UNSUBSCRIBED} when using manual assignment), this returns {@code Long.MAX_VALUE} | ||
| * to indicate there is no next heartbeat to wait for, allowing the application thread to block for | ||
| * the full user-specified poll timeout rather than spinning in a busy loop. | ||
| */ | ||
| @Override | ||
| public long maximumTimeToWait(long currentTimeMs) { | ||
| pollTimer.update(currentTimeMs); | ||
| if (membershipManager().shouldSkipHeartbeat()) { | ||
| return Long.MAX_VALUE; | ||
| } | ||
| if (pollTimer.isExpired() || (membershipManager().shouldHeartbeatNow() && !heartbeatRequestState.requestInFlight())) { | ||
| return 0L; | ||
| } |
There was a problem hiding this comment.
I was wondering if we should handle the pollTimer.isExpired() case here, as shouldSkipHeartbeat() being true would cause the logic to return Long.MAX_VALUE even if the timer has expired. However, I noticed that the callers use Math.min() with timer.remainingMs(), so it won't impact production. Ive also run the unit tests locally and everything looks solid.
There was a problem hiding this comment.
Is shouldSkipHeartbeat() being used to determine if the user is subscribed or not? I don't think they're synonymous 🤔
|
I wrote a local test case for this. @Test
public void testAssignBusyLoop() {
time = Time.SYSTEM;
consumer = newConsumer();
TopicPartition tp = new TopicPartition("foo", 3);
AtomicInteger loopCount = new AtomicInteger(0);
doAnswer(invocation -> {
loopCount.incrementAndGet();
return Fetch.empty();
}).when(fetchCollector).collectFetch(any(FetchBuffer.class));
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
completeAssignmentChangeEventSuccessfully();
consumer.assign(singleton(tp));
completeAsyncPollEventSuccessfully();
consumer.poll(Duration.ofSeconds(1));
assertTrue(loopCount.get() < 300,
"Busy loop detected! loopCount:" + loopCount.get());
}Unit Test Output: Proves that it indeed causes a busy loop. Subsequently, I used |
kirktrue
left a comment
There was a problem hiding this comment.
Thanks for the PR @brandboat!
| * <p>Similarly, we may have to unblock the application thread to send a {@link AsyncPollEvent} to make sure | ||
| * our poll timer will not expire while we are polling. | ||
| * | ||
| * <p>In the event that heartbeats are currently being skipped, this still returns the next heartbeat | ||
| * delay rather than {@code Long.MAX_VALUE} so that the application thread remains responsive. | ||
| * <p>In the event that heartbeats are currently being skipped (e.g., the member is in | ||
| * {@link MemberState#UNSUBSCRIBED} when using manual assignment), this returns {@code Long.MAX_VALUE} | ||
| * to indicate there is no next heartbeat to wait for, allowing the application thread to block for | ||
| * the full user-specified poll timeout rather than spinning in a busy loop. | ||
| */ | ||
| @Override | ||
| public long maximumTimeToWait(long currentTimeMs) { | ||
| pollTimer.update(currentTimeMs); | ||
| if (membershipManager().shouldSkipHeartbeat()) { | ||
| return Long.MAX_VALUE; | ||
| } | ||
| if (pollTimer.isExpired() || (membershipManager().shouldHeartbeatNow() && !heartbeatRequestState.requestInFlight())) { | ||
| return 0L; | ||
| } |
There was a problem hiding this comment.
Is shouldSkipHeartbeat() being used to determine if the user is subscribed or not? I don't think they're synonymous 🤔
When a consumer uses manual assignment (assign()) instead of group
subscription, the member remains in the UNSUBSCRIBED state. In this
state, heartbeats are skipped.
Previously, because the heartbeat interval was initialized to 0, the
maximumTimeToWaitcalculation would return 0 when heartbeats wereskipped. This caused
pollForFetchesto return immediately and enter abusy-loop, consuming excessive CPU.
This patch fixes the issue by ensuring
maximumTimeToWaitreturnsLong.MAX_VALUE whenever
shouldSkipHeartbeat()is true.Reviewers: Kirk True kirk@kirktrue.pro