Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
public final class ConsumerCleanUpRunner implements CleanUpRunner {
private final @NonNull Map<String, Object> kafkaProperties;
private final @NonNull String groupId;
private final @NonNull ConsumerCleanUpConfiguration cleanHooks;
Comment thread
jkbe marked this conversation as resolved.
private final @NonNull ConsumerCleanUpConfiguration cleanUpConfig;

/**
* Create a new {@code ConsumerCleanUpRunner} with default {@link ConsumerCleanUpConfiguration}
Expand Down Expand Up @@ -110,12 +110,12 @@ private void reset() {
.group(ConsumerCleanUpRunner.this.groupId);
groupClient.reset(OffsetSpec.earliest());

ConsumerCleanUpRunner.this.cleanHooks.runResetHooks();
ConsumerCleanUpRunner.this.cleanUpConfig.runResetHooks();
}

private void clean() {
this.deleteConsumerGroup();
ConsumerCleanUpRunner.this.cleanHooks.runCleanHooks();
ConsumerCleanUpRunner.this.cleanUpConfig.runCleanHooks();
}

private void deleteConsumerGroup() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public final class ConsumerProducerCleanUpRunner implements CleanUpRunner {
private final @NonNull ConsumerProducerTopicConfig topics;
private final @NonNull Map<String, Object> kafkaProperties;
private final @NonNull String groupId;
private final @NonNull ConsumerProducerCleanUpConfiguration cleanHooks;
private final @NonNull ConsumerProducerCleanUpConfiguration cleanUpConfig;

/**
* Create a new {@link ConsumerProducerCleanUpRunner} with default {@link ConsumerProducerCleanUpConfiguration}
Expand Down Expand Up @@ -86,7 +86,7 @@ public static ConsumerProducerCleanUpRunner create(@NonNull final ConsumerProduc

@Override
public void close() {
this.cleanHooks.close();
this.cleanUpConfig.close();
}

@Override
Expand Down Expand Up @@ -118,15 +118,15 @@ private class Task {
private void clean() {
this.deleteConsumerGroup();
this.deleteTopics();
ConsumerProducerCleanUpRunner.this.cleanHooks.runCleanHooks();
ConsumerProducerCleanUpRunner.this.cleanUpConfig.runCleanHooks();
}

private void reset() {
final ConsumerGroupClient groupClient = this.adminClient.consumerGroups()
.group(ConsumerProducerCleanUpRunner.this.groupId);
groupClient.reset(OffsetSpec.earliest());

ConsumerProducerCleanUpRunner.this.cleanHooks.runResetHooks();
ConsumerProducerCleanUpRunner.this.cleanUpConfig.runResetHooks();
}

private void deleteConsumerGroup() {
Expand All @@ -141,7 +141,7 @@ private void deleteTopics() {
private void deleteTopic(final String topic) {
this.adminClient.topics()
.topic(topic).deleteIfExists();
ConsumerProducerCleanUpRunner.this.cleanHooks.runTopicDeletionHooks(topic);
ConsumerProducerCleanUpRunner.this.cleanUpConfig.runTopicDeletionHooks(topic);
}

private Iterable<String> getAllOutputTopics() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
public final class ProducerCleanUpRunner implements CleanUpRunner {
private final @NonNull ProducerTopicConfig topics;
private final @NonNull Map<String, Object> kafkaProperties;
private final @NonNull ProducerCleanUpConfiguration cleanHooks;
private final @NonNull ProducerCleanUpConfiguration cleanUpConfig;

/**
* Create a new {@code ProducerCleanUpRunner} with default {@link ProducerCleanUpConfiguration}
Expand Down Expand Up @@ -78,7 +78,7 @@ public static ProducerCleanUpRunner create(@NonNull final ProducerTopicConfig to

@Override
public void close() {
this.cleanHooks.close();
this.cleanUpConfig.close();
}

/**
Expand All @@ -103,7 +103,7 @@ private class Task {

private void clean() {
this.deleteTopics();
ProducerCleanUpRunner.this.cleanHooks.runCleanHooks();
ProducerCleanUpRunner.this.cleanUpConfig.runCleanHooks();
}

private void deleteTopics() {
Expand All @@ -114,7 +114,7 @@ private void deleteTopics() {
private void deleteTopic(final String topic) {
this.adminClient.topics()
.topic(topic).deleteIfExists();
ProducerCleanUpRunner.this.cleanHooks.runTopicDeletionHooks(topic);
ProducerCleanUpRunner.this.cleanUpConfig.runTopicDeletionHooks(topic);
}

private Iterable<String> getAllOutputTopics() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public final class StreamsCleanUpRunner implements CleanUpRunner {
private final TopologyInformation topologyInformation;
private final Topology topology;
private final @NonNull StreamsConfigX config;
private final @NonNull StreamsCleanUpConfiguration cleanHooks;
private final @NonNull StreamsCleanUpConfiguration cleanUpConfig;

/**
* Create a new {@code StreamsCleanUpRunner} with default {@link StreamsCleanUpConfiguration}
Expand Down Expand Up @@ -160,12 +160,12 @@ private static Collection<String> filterExistingTopics(final Collection<String>

@Override
public void close() {
this.cleanHooks.close();
this.cleanUpConfig.close();
}

/**
* Clean up your Streams app by resetting the app and deleting the output topics
* and consumer group.
* Clean up your Streams app by resetting the app and deleting the output topics and consumer group.
*
* @see #reset()
*/
@Override
Expand Down Expand Up @@ -216,7 +216,7 @@ private void reset(final Collection<String> allTopics) {
try (final KafkaStreams kafkaStreams = this.createStreams()) {
kafkaStreams.cleanUp();
}
StreamsCleanUpRunner.this.cleanHooks.runResetHooks();
StreamsCleanUpRunner.this.cleanUpConfig.runResetHooks();
}

private KafkaStreams createStreams() {
Expand All @@ -233,7 +233,7 @@ private void cleanAndReset() {
private void clean(final Collection<String> allTopics) {
this.deleteOutputTopics(allTopics);
this.deleteConsumerGroup();
StreamsCleanUpRunner.this.cleanHooks.runCleanHooks();
StreamsCleanUpRunner.this.cleanUpConfig.runCleanHooks();
}

private void deleteIntermediateTopics(final Collection<String> allTopics) {
Expand All @@ -248,13 +248,13 @@ private void deleteOutputTopics(final Collection<String> allTopics) {
}

private void resetInternalTopic(final String topic) {
StreamsCleanUpRunner.this.cleanHooks.runTopicDeletionHooks(topic);
StreamsCleanUpRunner.this.cleanUpConfig.runTopicDeletionHooks(topic);
}

private void deleteTopic(final String topic) {
this.adminClient.topics()
.topic(topic).deleteIfExists();
StreamsCleanUpRunner.this.cleanHooks.runTopicDeletionHooks(topic);
StreamsCleanUpRunner.this.cleanUpConfig.runTopicDeletionHooks(topic);
}

private void deleteConsumerGroup() {
Expand Down
Loading