Skip to content

Commit 77732ca

Browse files
committed
chore(trino): Backport Kafka offset handling to 477
Backport trinodb/trino#26789 to Trino 477
1 parent b1f06e7 commit 77732ca

1 file changed

Lines changed: 26 additions & 0 deletions

File tree

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
From a46ead926108b561ca6fa6b193dd38db5a8cb125 Mon Sep 17 00:00:00 2001
2+
From: Sebastian Bernauer <sebastian.bernauer@stackable.tech>
3+
Date: Wed, 7 Jan 2026 13:30:17 +0100
4+
Subject: Fix findOffsetsForTimestampGreaterOrEqual
5+
6+
Back-port of https://github.com/trinodb/trino/pull/26789
7+
8+
Co-authored-by: Mateusz "Serafin" Gajewski <github@wendigo.pl>
9+
---
10+
.../main/java/io/trino/plugin/kafka/KafkaFilterManager.java | 3 +--
11+
1 file changed, 1 insertion(+), 2 deletions(-)
12+
13+
diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaFilterManager.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaFilterManager.java
14+
index 5c853d9011..25d3c83301 100644
15+
--- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaFilterManager.java
16+
+++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaFilterManager.java
17+
@@ -182,8 +182,7 @@ public class KafkaFilterManager
18+
{
19+
Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestamps = kafkaConsumer.offsetsForTimes(timestamps);
20+
return topicPartitionOffsetAndTimestamps.entrySet().stream()
21+
- .collect(toMap(Map.Entry::getKey, entry -> Optional.of(entry.getValue())
22+
- .map(OffsetAndTimestamp::offset)));
23+
+ .collect(toMap(Map.Entry::getKey, entry -> Optional.ofNullable(entry.getValue()).map(OffsetAndTimestamp::offset)));
24+
}
25+
26+
private static Map<TopicPartition, Long> overridePartitionBeginOffsets(Map<TopicPartition, Long> partitionBeginOffsets,

0 commit comments

Comments
 (0)