diff --git a/.github/workflows/bot.yml b/.github/workflows/bot.yml new file mode 100644 index 0000000..6f44a35 --- /dev/null +++ b/.github/workflows/bot.yml @@ -0,0 +1,67 @@ +name: Bot Build + +on: + workflow_dispatch: + pull_request: + paths: + - .github/workflows/bot.yml + - bot/** + +env: + REGISTRY: ghcr.io + IMAGE_NAME: ${{ github.repository }}/bot + +jobs: + build: + runs-on: ubuntu-latest + permissions: + packages: write + pull-requests: write + + steps: + - uses: actions/checkout@v4 + + - name: Set up Java 22 + uses: actions/setup-java@v4 + with: + java-version: '22' + distribution: 'temurin' + cache: maven + + - name: Maven package + run: mvn -f bot/pom.xml clean package + + - id: jacoco + uses: madrapps/jacoco-report@v1.6.1 + if: github.event_name != 'workflow_dispatch' + with: + paths: bot/target/site/jacoco/jacoco.xml + token: ${{ secrets.GITHUB_TOKEN }} + min-coverage-overall: 30 + min-coverage-changed-files: 30 + title: Code Coverage + update-comment: true + + - name: Log in to GitHub Container Registry + uses: docker/login-action@v2 + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Extract metadata (tags, labels) + id: meta + uses: docker/metadata-action@v4 + with: + images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} + + - name: Build & push Docker image + uses: docker/build-push-action@v4 + with: + context: . + file: bot.Dockerfile + push: true + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} + build-args: | + TELEGRAM_TOKEN=${{ secrets.TELEGRAM_TOKEN }} diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml deleted file mode 100644 index 113e9b5..0000000 --- a/.github/workflows/build.yaml +++ /dev/null @@ -1,54 +0,0 @@ -name: Build - -on: - workflow_dispatch: - pull_request: - -jobs: - build: - runs-on: ubuntu-latest - name: Build - permissions: - contents: read - packages: write - pull-requests: write - - steps: - - uses: actions/checkout@v4 - - uses: actions/setup-java@v4 - with: - java-version: '23' - distribution: 'temurin' - cache: maven - - - name: maven build - run: mvn verify - - - id: jacoco - uses: madrapps/jacoco-report@v1.7.1 - if: ( github.event_name != 'workflow_dispatch' ) - with: - paths: ${{ github.workspace }}/report/target/site/jacoco/jacoco.xml - token: ${{ secrets.GITHUB_TOKEN }} - min-coverage-overall: 30 - min-coverage-changed-files: 30 - title: Code Coverage - update-comment: true - - linter: - name: linter - runs-on: ubuntu-latest - permissions: - contents: read - packages: write - pull-requests: write - - steps: - - uses: actions/checkout@v4 - - uses: actions/setup-java@v4 - with: - java-version: '23' - distribution: 'temurin' - cache: maven - - - run: mvn compile -am spotless:check modernizer:modernizer spotbugs:check pmd:check pmd:cpd-check diff --git a/.github/workflows/scrapper.yml b/.github/workflows/scrapper.yml new file mode 100644 index 0000000..d9f679a --- /dev/null +++ b/.github/workflows/scrapper.yml @@ -0,0 +1,68 @@ +name: Scrapper Build + +on: + workflow_dispatch: + pull_request: + paths: + - .github/workflows/scrapper.yml + - scrapper/** + - link-parser/** + +env: + REGISTRY: ghcr.io + IMAGE_NAME: ${{ github.repository }}/scrapper + +jobs: + build: + runs-on: ubuntu-latest + permissions: + packages: write + pull-requests: write + + steps: + - uses: actions/checkout@v4 + + - name: Set up Java 21 + uses: actions/setup-java@v4 + with: + java-version: '21' + distribution: 'temurin' + cache: maven + + - name: Maven package + run: mvn -f scrapper/pom.xml clean package + + - id: jacoco + uses: madrapps/jacoco-report@v1.6.1 + if: github.event_name != 'workflow_dispatch' + with: + paths: scrapper/target/site/jacoco/jacoco.xml + token: ${{ secrets.GITHUB_TOKEN }} + min-coverage-overall: 30 + min-coverage-changed-files: 30 + title: Code Coverage + update-comment: true + + - name: Log in to GitHub Container Registry + uses: docker/login-action@v2 + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Extract metadata (tags, labels) + id: meta + uses: docker/metadata-action@v4 + with: + images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} + + - name: Build & push Docker image + uses: docker/build-push-action@v4 + with: + context: . + file: scrapper.Dockerfile + push: true + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} + build-args: | + SO_TOKEN_KEY=${{ secrets.SO_TOKEN_KEY }} diff --git a/avro-schemas/pom.xml b/avro-schemas/pom.xml new file mode 100644 index 0000000..8d8df74 --- /dev/null +++ b/avro-schemas/pom.xml @@ -0,0 +1,63 @@ + + + 4.0.0 + + + + backend.academy + root + 1.0 + ../pom.xml + + + backend.academy + avro-schemas + 1.0 + jar + Avro Schemas Module + + + UTF-8 + + + + + + org.apache.avro + avro + 1.11.3 + + + + + + + + org.apache.maven.plugins + maven-pmd-plugin + 3.26.0 + + true + + + + org.apache.avro + avro-maven-plugin + 1.11.3 + + + generate-avro-sources + + schema + + generate-sources + + ${project.basedir}/src/main/avro + String + + + + + + + diff --git a/avro-schemas/src/main/avro/NotificationRequest.avsc b/avro-schemas/src/main/avro/NotificationRequest.avsc new file mode 100644 index 0000000..f909b59 --- /dev/null +++ b/avro-schemas/src/main/avro/NotificationRequest.avsc @@ -0,0 +1,15 @@ +{ + "namespace": "backend.academy.avro", + "type": "record", + "name": "NotificationRequest", + "fields": [ + { + "name": "message", + "type": "string" + }, + { + "name": "userId", + "type": "long" + } + ] +} diff --git a/bot/bot.Dockerfile b/bot/bot.Dockerfile new file mode 100644 index 0000000..b0a5a13 --- /dev/null +++ b/bot/bot.Dockerfile @@ -0,0 +1,14 @@ +FROM maven:3.8.8-eclipse-temurin-22 AS build +WORKDIR /workspace +COPY bot/pom.xml bot/pom.xml +COPY bot/src bot/src +RUN mvn -f bot/pom.xml clean package -DskipTests + +FROM eclipse-temurin:22-jre-alpine +WORKDIR /app +COPY --from=build /workspace/bot/target/bot.jar ./bot.jar + +ARG APP_TELEGRAM_TOKEN +ENV APP_TELEGRAM_TOKEN=${APP_TELEGRAM_TOKEN} +EXPOSE 8090 +ENTRYPOINT ["sh", "-c", "java -jar -Dapp.telegram-token=$APP_TELEGRAM_TOKEN /app/bot.jar"] diff --git a/bot/pom.xml b/bot/pom.xml index 064cae5..b75a614 100644 --- a/bot/pom.xml +++ b/bot/pom.xml @@ -1,16 +1,49 @@ - + 4.0.0 backend.academy root - ${revision} + 1.0 + ../pom.xml bot + + 2.18.0 + 1.11.3 + + + + backend.academy + avro-schemas + 1.0 + + + io.github.resilience4j + resilience4j-spring-boot2 + 1.7.1 + + + io.github.resilience4j + resilience4j-reactor + 1.7.1 + + + com.bucket4j + bucket4j_jdk17-core + 8.14.0 + + + org.apache.avro + avro + ${avro.version} + compile + com.github.pengrad java-telegram-bot-api @@ -44,10 +77,10 @@ - - - - + + org.springframework.kafka + spring-kafka + @@ -75,11 +108,17 @@ true + + + + + + + - org.springframework.boot - spring-boot-devtools - runtime - true + org.springframework.cloud + spring-cloud-starter-contract-stub-runner + 4.1.2 @@ -112,11 +151,21 @@ kafka test - - - - - + + org.springframework.kafka + spring-kafka-test + test + + + com.redis + testcontainers-redis + test + + + commons-io + commons-io + + diff --git a/bot/src/main/java/backend/academy/bot/BotConfig.java b/bot/src/main/java/backend/academy/bot/BotConfig.java index 63f184d..8164ee4 100644 --- a/bot/src/main/java/backend/academy/bot/BotConfig.java +++ b/bot/src/main/java/backend/academy/bot/BotConfig.java @@ -1,19 +1,42 @@ package backend.academy.bot; +import backend.academy.bot.config.CircuitBreakerProps; +import backend.academy.bot.config.HttpTimeoutProps; +import backend.academy.bot.config.RetryProps; import com.pengrad.telegrambot.TelegramBot; +import io.github.resilience4j.circuitbreaker.CircuitBreaker; +import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig; +import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry; +import io.github.resilience4j.reactor.circuitbreaker.operator.CircuitBreakerOperator; +import io.netty.channel.ChannelOption; +import io.netty.handler.timeout.ReadTimeoutHandler; +import io.netty.handler.timeout.WriteTimeoutHandler; import jakarta.validation.constraints.NotEmpty; +import java.util.List; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.boot.context.properties.ConfigurationPropertiesScan; import org.springframework.context.annotation.Bean; +import org.springframework.http.HttpMethod; +import org.springframework.http.client.reactive.ReactorClientHttpConnector; import org.springframework.scheduling.annotation.AsyncConfigurer; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.validation.annotation.Validated; +import org.springframework.web.reactive.function.client.ClientRequest; +import org.springframework.web.reactive.function.client.ClientResponse; +import org.springframework.web.reactive.function.client.ExchangeFilterFunction; import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.reactive.function.client.WebClientResponseException; +import reactor.core.publisher.Mono; +import reactor.netty.http.client.HttpClient; +import reactor.util.retry.Retry; @Validated -@ConfigurationProperties(prefix = "app", ignoreUnknownFields = false) +@ConfigurationProperties(prefix = "app", ignoreUnknownFields = true) @EnableAsync +@ConfigurationPropertiesScan public record BotConfig(@NotEmpty String telegramToken) implements AsyncConfigurer { @Bean public TelegramBot telegramBot(BotConfig botConfig) { @@ -21,8 +44,73 @@ public TelegramBot telegramBot(BotConfig botConfig) { } @Bean - public WebClient webClient() { - return WebClient.builder().build(); + public CircuitBreaker circuitBreaker(CircuitBreakerProps props) { + CircuitBreakerConfig cfg = CircuitBreakerConfig.custom() + .slidingWindowType(CircuitBreakerConfig.SlidingWindowType.COUNT_BASED) + .slidingWindowSize(props.slidingWindowSize()) + .minimumNumberOfCalls(props.minimumRequiredCalls()) + .failureRateThreshold(props.failureRateThreshold()) + .permittedNumberOfCallsInHalfOpenState(props.permittedCallsInHalfOpenState()) + .waitDurationInOpenState(props.waitDurationInOpenState()) + .recordExceptions(Throwable.class) + .build(); + + return CircuitBreaker.of("backendServiceCb", cfg); + } + + @Bean(name = "notificationCb") + public CircuitBreaker notificationCircuitBreaker(CircuitBreakerRegistry registry) { + return registry.circuitBreaker("notificationCb"); + } + + + @Bean + public WebClient webClient(HttpTimeoutProps t, RetryProps retryProps, CircuitBreakerProps circuitBreakerProps) { + HttpClient http = HttpClient.create() + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, + (int) t.connectTimeout().toMillis()) + .responseTimeout(t.responseTimeout()) + .doOnConnected(conn -> conn + .addHandlerLast(new ReadTimeoutHandler( + t.readTimeout().toMillis(), TimeUnit.MILLISECONDS)) + .addHandlerLast(new WriteTimeoutHandler( + t.writeTimeout().toMillis(), TimeUnit.MILLISECONDS))); + + + ExchangeFilterFunction cbFilter = (request, next) -> + next.exchange(request) + .transformDeferred(CircuitBreakerOperator.of(circuitBreaker(circuitBreakerProps))); + + ExchangeFilterFunction retryFilter = (request, next) -> next.exchange(request) + .flatMap(response -> { + if (shouldRetry(request, response, retryProps)) { + return response.createException() + .flatMap(Mono::error); + } + return Mono.just(response); + }) + .retryWhen(Retry.fixedDelay(retryProps.attempts(), retryProps.backoff()) + .filter(ex -> isRetryable(ex, retryProps))); + + return WebClient.builder() + .clientConnector(new ReactorClientHttpConnector(http)) + .filter(cbFilter) + .filter(retryFilter) + .build(); + } + + private boolean shouldRetry(ClientRequest req, ClientResponse resp, RetryProps rp) { + boolean codeMatch = rp.statusCodes().contains(resp.statusCode().value()); + boolean methodOk = req.method() != null && + List.of(HttpMethod.GET, HttpMethod.HEAD, HttpMethod.PUT, + HttpMethod.DELETE, HttpMethod.OPTIONS) + .contains(req.method()); + return codeMatch && methodOk; + } + + private boolean isRetryable(Throwable ex, RetryProps rp) { + return ex instanceof WebClientResponseException wce && + rp.statusCodes().contains(wce.getRawStatusCode()); } @Bean(name = "botTaskExecutor") diff --git a/bot/src/main/java/backend/academy/bot/client/ScrapperClient.java b/bot/src/main/java/backend/academy/bot/client/ScrapperClient.java index d6d77b0..ab355ac 100644 --- a/bot/src/main/java/backend/academy/bot/client/ScrapperClient.java +++ b/bot/src/main/java/backend/academy/bot/client/ScrapperClient.java @@ -5,8 +5,8 @@ import backend.academy.bot.model.dto.UntrackingResponse; import backend.academy.bot.model.dto.UserRegistrationRequest; import backend.academy.bot.model.dto.UserRegistrationResponse; -import backend.academy.bot.model.entities.Link; -import backend.academy.bot.services.NotificationService; +import backend.academy.bot.model.entity.Link; +import backend.academy.bot.service.notification.HttpNotificationService; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -21,15 +21,12 @@ public class ScrapperClient { private static final Logger log = LoggerFactory.getLogger(ScrapperClient.class); private final WebClient webClient; - private final NotificationService notificationService; private final String scrapperBaseUrl; public ScrapperClient( WebClient webClient, - NotificationService notificationService, @Value("${scrapper.base-url}") String scrapperBaseUrl) { this.webClient = webClient; - this.notificationService = notificationService; this.scrapperBaseUrl = scrapperBaseUrl; } diff --git a/bot/src/main/java/backend/academy/bot/commandHandlers/handlers/HelpCommandHandler.java b/bot/src/main/java/backend/academy/bot/commandHandlers/handlers/HelpCommandHandler.java index dc502bc..8cf88c6 100644 --- a/bot/src/main/java/backend/academy/bot/commandHandlers/handlers/HelpCommandHandler.java +++ b/bot/src/main/java/backend/academy/bot/commandHandlers/handlers/HelpCommandHandler.java @@ -1,6 +1,5 @@ package backend.academy.bot.commandHandlers.handlers; -import backend.academy.bot.commandHandlers.CommandDispatcher; import com.pengrad.telegrambot.TelegramBot; import com.pengrad.telegrambot.model.Update; import com.pengrad.telegrambot.request.SendMessage; @@ -21,10 +20,6 @@ public boolean supports(Update update) { @Override public void handle(Update update) { - if (!CommandDispatcher.isIsStarted()) { - telegramBot.execute(new SendMessage(update.message().chat().id(), "Сначала нужно зарегистрироваться.")); - return; - } String responseText = """ /start - регистрация пользователя.\n diff --git a/bot/src/main/java/backend/academy/bot/commandHandlers/handlers/ListCommandHandler.java b/bot/src/main/java/backend/academy/bot/commandHandlers/handlers/ListCommandHandler.java index c1c711e..3abcc79 100644 --- a/bot/src/main/java/backend/academy/bot/commandHandlers/handlers/ListCommandHandler.java +++ b/bot/src/main/java/backend/academy/bot/commandHandlers/handlers/ListCommandHandler.java @@ -1,7 +1,6 @@ package backend.academy.bot.commandHandlers.handlers; -import backend.academy.bot.client.ScrapperClient; -import backend.academy.bot.model.entities.Link; +import backend.academy.bot.service.LinkCacheService; import com.pengrad.telegrambot.TelegramBot; import com.pengrad.telegrambot.model.Update; import com.pengrad.telegrambot.request.SendMessage; @@ -14,10 +13,10 @@ public class ListCommandHandler implements CommandHandler { private static final Logger log = LoggerFactory.getLogger(ListCommandHandler.class); private final TelegramBot telegramBot; - private final ScrapperClient scrapperClient; + private final LinkCacheService linkCacheService; - public ListCommandHandler(ScrapperClient scrapperClient, TelegramBot telegramBot) { - this.scrapperClient = scrapperClient; + public ListCommandHandler(LinkCacheService linkCacheService, TelegramBot telegramBot) { + this.linkCacheService = linkCacheService; this.telegramBot = telegramBot; } @@ -32,45 +31,33 @@ public void handle(Update update) { if (update.message() == null || update.message().from() == null) { return; } - log.info("Обрабатываем команду /list от {}", update.message().from().id()); - long telegramId = Long.parseLong(update.message().from().id().toString()); - scrapperClient - .getLinksByUserId(telegramId) + long telegramId = update.message().from().id(); + log.info("Обрабатываем команду /list от {}", telegramId); + + linkCacheService + .getLinks(telegramId) .subscribe( links -> { if (links.isEmpty()) { - log.info( - "Ответ на /list: пусто для {}", - update.message().from().id()); sendMessage(update, "Пока тут ничего нет"); } else { StringBuilder sb = new StringBuilder(); for (int i = 0; i < links.size(); i++) { - Link link = links.get(i); sb.append(i) .append(". ") - .append(link.getLink()) + .append(links.get(i).getLink()) .append("\n"); } - log.info( - "Ответ на /list: есть данные для {}:{}", - update.message().from().id(), - sb); sendMessage(update, sb.toString()); } }, error -> { - log.error( - "Ошибка при получении ссылок для пользователя {}: {}", - telegramId, - error.getMessage()); + log.error("Ошибка при получении ссылок для {}: {}", telegramId, error.getMessage()); sendMessage(update, "Ошибка при получении списка ссылок: " + error.getMessage()); }); } private void sendMessage(Update update, String text) { - Long chatId = update.message().chat().id(); - SendMessage sendMessage = new SendMessage(chatId, text); - telegramBot.execute(sendMessage); + telegramBot.execute(new SendMessage(update.message().chat().id(), text)); } } diff --git a/bot/src/main/java/backend/academy/bot/commandHandlers/handlers/StartCommandHandler.java b/bot/src/main/java/backend/academy/bot/commandHandlers/handlers/StartCommandHandler.java index 5a88a30..b222535 100644 --- a/bot/src/main/java/backend/academy/bot/commandHandlers/handlers/StartCommandHandler.java +++ b/bot/src/main/java/backend/academy/bot/commandHandlers/handlers/StartCommandHandler.java @@ -30,14 +30,14 @@ public void handle(Update update) { .registerUser(telegramId, username) .subscribe( response -> { - if (response.isExists()) { + sendMessage(update, "Пользователь зарегистрирован"); + }, + error -> { + if (error.getMessage().contains("уже существует")) { sendMessage(update, "Пользователь уже зарегистрирован"); } else { - sendMessage(update, "Пользователь зарегистрирован"); + sendMessage(update, "Ошибка при регистрации: " + error.getMessage()); } - }, - error -> { - sendMessage(update, "Ошибка при регистрации: " + error.getMessage()); }); } } diff --git a/bot/src/main/java/backend/academy/bot/commandHandlers/handlers/TrackCommandHandler.java b/bot/src/main/java/backend/academy/bot/commandHandlers/handlers/TrackCommandHandler.java index 3b20e34..fc6c209 100644 --- a/bot/src/main/java/backend/academy/bot/commandHandlers/handlers/TrackCommandHandler.java +++ b/bot/src/main/java/backend/academy/bot/commandHandlers/handlers/TrackCommandHandler.java @@ -1,6 +1,7 @@ package backend.academy.bot.commandHandlers.handlers; import backend.academy.bot.client.ScrapperClient; +import backend.academy.bot.service.LinkCacheService; import backend.academy.bot.states.TrackCommandState; import backend.academy.bot.states.TrackStateManager; import com.pengrad.telegrambot.TelegramBot; @@ -24,11 +25,17 @@ public class TrackCommandHandler implements CommandHandler { private static final String TRACK_COMMAND = "/track"; private static final String SEP_REGEX = "\\s+"; private static final Logger log = LoggerFactory.getLogger(TrackCommandHandler.class); + private final LinkCacheService linkCacheService; - public TrackCommandHandler(TrackStateManager stateManager, TelegramBot telegramBot, ScrapperClient scrapperClient) { + public TrackCommandHandler( + TrackStateManager stateManager, + TelegramBot telegramBot, + ScrapperClient scrapperClient, + LinkCacheService linkCacheService) { this.stateManager = stateManager; this.telegramBot = telegramBot; this.scrapperClient = scrapperClient; + this.linkCacheService = linkCacheService; } @Override @@ -83,6 +90,7 @@ public void handle(Update update) { userId, new ArrayList<>(state.getTags()), new ArrayList<>(state.getFilters())) + .flatMap(unused -> linkCacheService.invalidateCache(userId)) .subscribe( unused -> {}, error -> telegramBot.execute(new SendMessage( diff --git a/bot/src/main/java/backend/academy/bot/commandHandlers/handlers/UntrackCommandHandler.java b/bot/src/main/java/backend/academy/bot/commandHandlers/handlers/UntrackCommandHandler.java index 4488835..6adb249 100644 --- a/bot/src/main/java/backend/academy/bot/commandHandlers/handlers/UntrackCommandHandler.java +++ b/bot/src/main/java/backend/academy/bot/commandHandlers/handlers/UntrackCommandHandler.java @@ -2,12 +2,14 @@ import backend.academy.bot.client.ScrapperClient; import backend.academy.bot.model.dto.UntrackingResponse; +import backend.academy.bot.service.LinkCacheService; import com.pengrad.telegrambot.TelegramBot; import com.pengrad.telegrambot.model.Update; import com.pengrad.telegrambot.request.SendMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; +import reactor.core.publisher.Mono; @Component public class UntrackCommandHandler implements CommandHandler { @@ -15,10 +17,13 @@ public class UntrackCommandHandler implements CommandHandler { private static final Logger log = LoggerFactory.getLogger(UntrackCommandHandler.class); private final TelegramBot telegramBot; private final ScrapperClient scrapperClient; + private final LinkCacheService linkCacheService; - public UntrackCommandHandler(TelegramBot telegramBot, ScrapperClient scrapperClient) { + public UntrackCommandHandler( + TelegramBot telegramBot, ScrapperClient scrapperClient, LinkCacheService linkCacheService) { this.telegramBot = telegramBot; this.scrapperClient = scrapperClient; + this.linkCacheService = linkCacheService; } @Override @@ -45,6 +50,13 @@ public void handle(Update update) { scrapperClient .removeTracking(link, userId) + .flatMap(response -> { + if (response.isRemoved()) { + return linkCacheService.invalidateCache(userId).thenReturn(response); + } else { + return Mono.just(response); + } + }) .subscribe( (UntrackingResponse response) -> { if (response.isRemoved()) { diff --git a/bot/src/main/java/backend/academy/bot/config/CircuitBreakerProps.java b/bot/src/main/java/backend/academy/bot/config/CircuitBreakerProps.java new file mode 100644 index 0000000..ad0bdc9 --- /dev/null +++ b/bot/src/main/java/backend/academy/bot/config/CircuitBreakerProps.java @@ -0,0 +1,17 @@ +package backend.academy.bot.config; + +import jakarta.validation.constraints.Min; +import jakarta.validation.constraints.NotNull; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.validation.annotation.Validated; +import java.time.Duration; + +@Validated +@ConfigurationProperties(prefix = "app.http.circuit-breaker") +public record CircuitBreakerProps( + @Min(1) int slidingWindowSize, + @Min(1) int minimumRequiredCalls, + @Min(1) int failureRateThreshold, + @Min(1) int permittedCallsInHalfOpenState, + @NotNull Duration waitDurationInOpenState) { } + diff --git a/bot/src/main/java/backend/academy/bot/config/HttpTimeoutProps.java b/bot/src/main/java/backend/academy/bot/config/HttpTimeoutProps.java new file mode 100644 index 0000000..4666f86 --- /dev/null +++ b/bot/src/main/java/backend/academy/bot/config/HttpTimeoutProps.java @@ -0,0 +1,15 @@ +package backend.academy.bot.config; + +import jakarta.validation.constraints.NotNull; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.validation.annotation.Validated; +import java.time.Duration; + +@Validated +@ConfigurationProperties(prefix = "app.http") +public record HttpTimeoutProps( + @NotNull Duration connectTimeout, + @NotNull Duration readTimeout, + @NotNull Duration writeTimeout, + @NotNull Duration responseTimeout) { } + diff --git a/bot/src/main/java/backend/academy/bot/config/KafkaConfig.java b/bot/src/main/java/backend/academy/bot/config/KafkaConfig.java new file mode 100644 index 0000000..521f37f --- /dev/null +++ b/bot/src/main/java/backend/academy/bot/config/KafkaConfig.java @@ -0,0 +1,70 @@ +// src/main/java/backend/academy/bot/config/KafkaConfig.java +package backend.academy.bot.config; + +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.common.TopicPartition; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.TopicBuilder; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.listener.ContainerProperties.AckMode; +import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; +import org.springframework.kafka.listener.DefaultErrorHandler; +import org.springframework.kafka.support.ExponentialBackOffWithMaxRetries; + +@Configuration +public class KafkaConfig { + + @Value("${app.topics.input}") + private String inputTopic; + + @Value("${app.topics.dead-letter}") + private String deadLetterTopic; + + @Bean + public NewTopic inputTopic() { + return TopicBuilder.name(inputTopic).partitions(1).replicas(1).build(); + } + + @Bean + public NewTopic deadLetterTopic() { + return TopicBuilder.name(deadLetterTopic).partitions(1).replicas(1).build(); + } + + @Bean + public DeadLetterPublishingRecoverer recoverer(KafkaTemplate template) { + return new DeadLetterPublishingRecoverer( + template, (record, ex) -> new TopicPartition(deadLetterTopic, record.partition())); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( + ConsumerFactory cf, + DeadLetterPublishingRecoverer recoverer, + @Value("${app.listener.concurrency}") int concurrency) { + + var factory = new ConcurrentKafkaListenerContainerFactory(); + factory.setConsumerFactory(cf); + factory.setConcurrency(concurrency); + factory.setBatchListener(true); + + var props = factory.getContainerProperties(); + props.setAckMode(AckMode.MANUAL_IMMEDIATE); + props.setPollTimeout(1_000); + props.setIdleBetweenPolls(500); + props.setObservationEnabled(true); + + var backOff = new ExponentialBackOffWithMaxRetries(5); + backOff.setInitialInterval(500); + backOff.setMultiplier(2); + backOff.setMaxInterval(10_000); + + var eh = new DefaultErrorHandler(recoverer, backOff); + eh.addNotRetryableExceptions(IllegalArgumentException.class); + factory.setCommonErrorHandler(eh); + return factory; + } +} diff --git a/bot/src/main/java/backend/academy/bot/config/RateLimitConfig.java b/bot/src/main/java/backend/academy/bot/config/RateLimitConfig.java new file mode 100644 index 0000000..071c105 --- /dev/null +++ b/bot/src/main/java/backend/academy/bot/config/RateLimitConfig.java @@ -0,0 +1,77 @@ +package backend.academy.bot.config; + +import io.github.bucket4j.Bandwidth; +import io.github.bucket4j.Bucket; +import io.github.bucket4j.Refill; +import jakarta.servlet.FilterChain; +import jakarta.servlet.ServletException; +import jakarta.servlet.http.HttpServletRequest; +import jakarta.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.boot.web.servlet.FilterRegistrationBean; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.Ordered; +import org.springframework.web.filter.OncePerRequestFilter; + +@Configuration +@EnableConfigurationProperties(RateLimitProps.class) +public class RateLimitConfig { + + private final RateLimitProps props; + private final ConcurrentMap buckets = new ConcurrentHashMap<>(); + + public RateLimitConfig(RateLimitProps props) { + this.props = props; + } + + @Bean + public FilterRegistrationBean rateLimitFilter() { + OncePerRequestFilter filter = new OncePerRequestFilter() { + @Override + protected void doFilterInternal( + HttpServletRequest req, HttpServletResponse res, FilterChain chain) + throws IOException, ServletException { + + String ip = extractClientIp(req); + String path = req.getRequestURI(); + String key = ip + "::" + path; + + Bucket bucket = buckets.computeIfAbsent(key, it -> newBucketFor(path)); + if (bucket.tryConsume(1)) { + chain.doFilter(req, res); + } else { + RateLimitProps.RateLimitSettings s = settingsFor(path); + res.setStatus(s.getErrorCode()); + res.getWriter().write(s.getErrorMessage()); + } + } + }; + + FilterRegistrationBean reg = new FilterRegistrationBean<>(filter); + reg.setOrder(Ordered.HIGHEST_PRECEDENCE); + return reg; + } + + private Bucket newBucketFor(String path) { + RateLimitProps.RateLimitSettings s = settingsFor(path); + Refill refill = Refill.intervally(s.getRefillTokens(), s.getPeriod()); + Bandwidth limit = Bandwidth.classic(s.getCapacity(), refill); + return Bucket.builder().addLimit(limit).build(); + } + + private RateLimitProps.RateLimitSettings settingsFor(String path) { + return props.getPerEndpoint().getOrDefault(path, props.getDefaults()); + } + + private String extractClientIp(HttpServletRequest req) { + String xf = req.getHeader("X-Forwarded-For"); + if (xf != null && !xf.isBlank()) { + return xf.split(",")[0].trim(); + } + return req.getRemoteAddr(); + } +} diff --git a/bot/src/main/java/backend/academy/bot/config/RateLimitProps.java b/bot/src/main/java/backend/academy/bot/config/RateLimitProps.java new file mode 100644 index 0000000..5afdc15 --- /dev/null +++ b/bot/src/main/java/backend/academy/bot/config/RateLimitProps.java @@ -0,0 +1,91 @@ +package backend.academy.bot.config; + +import jakarta.validation.constraints.Min; +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.validation.annotation.Validated; + +@Validated +@ConfigurationProperties(prefix = "app.rate-limit") +public class RateLimitProps { + + @NotNull + private RateLimitSettings defaults; + + @NotNull + private Map perEndpoint = new HashMap<>(); + + public RateLimitSettings getDefaults() { + return defaults; + } + + public void setDefaults(RateLimitSettings defaults) { + this.defaults = defaults; + } + + public Map getPerEndpoint() { + return perEndpoint; + } + + public void setPerEndpoint(Map perEndpoint) { + this.perEndpoint = perEndpoint; + } + + @Validated + public static class RateLimitSettings { + @Min(1) + private int capacity; + @Min(1) + private int refillTokens; + @NotNull + private Duration period; + @Min(100) + private int errorCode; + @NotEmpty + private String errorMessage; + + public int getCapacity() { + return capacity; + } + + public void setCapacity(int capacity) { + this.capacity = capacity; + } + + public int getRefillTokens() { + return refillTokens; + } + + public void setRefillTokens(int refillTokens) { + this.refillTokens = refillTokens; + } + + public Duration getPeriod() { + return period; + } + + public void setPeriod(Duration period) { + this.period = period; + } + + public int getErrorCode() { + return errorCode; + } + + public void setErrorCode(int errorCode) { + this.errorCode = errorCode; + } + + public String getErrorMessage() { + return errorMessage; + } + + public void setErrorMessage(String errorMessage) { + this.errorMessage = errorMessage; + } + } +} diff --git a/bot/src/main/java/backend/academy/bot/config/RedisConfig.java b/bot/src/main/java/backend/academy/bot/config/RedisConfig.java new file mode 100644 index 0000000..4f22abf --- /dev/null +++ b/bot/src/main/java/backend/academy/bot/config/RedisConfig.java @@ -0,0 +1,30 @@ +package backend.academy.bot.config; + +import backend.academy.bot.model.entity.Link; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory; +import org.springframework.data.redis.core.ReactiveRedisOperations; +import org.springframework.data.redis.core.ReactiveRedisTemplate; +import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; +import org.springframework.data.redis.serializer.RedisSerializationContext; +import org.springframework.data.redis.serializer.StringRedisSerializer; + +@Configuration +public class RedisConfig { + + @Bean + public ReactiveRedisOperations linkRedisOperations( + ReactiveRedisConnectionFactory factory, RedisSerializationContext serializationContext) { + return new ReactiveRedisTemplate<>(factory, serializationContext); + } + + @Bean + public RedisSerializationContext serializationContext() { + StringRedisSerializer keySer = new StringRedisSerializer(); + Jackson2JsonRedisSerializer valSer = new Jackson2JsonRedisSerializer<>(Link.class); + return RedisSerializationContext.newSerializationContext(keySer) + .value(valSer) + .build(); + } +} diff --git a/bot/src/main/java/backend/academy/bot/config/RetryProps.java b/bot/src/main/java/backend/academy/bot/config/RetryProps.java new file mode 100644 index 0000000..13145fe --- /dev/null +++ b/bot/src/main/java/backend/academy/bot/config/RetryProps.java @@ -0,0 +1,17 @@ +package backend.academy.bot.config; + +import jakarta.validation.constraints.Min; +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.validation.annotation.Validated; +import java.time.Duration; +import java.util.List; + +@Validated +@ConfigurationProperties(prefix = "app.http.retry") +public record RetryProps( + @NotNull @Min(1) Integer attempts, + @NotNull Duration backoff, + @NotEmpty List statusCodes) { } + diff --git a/bot/src/main/java/backend/academy/bot/controller/BotNotificationController.java b/bot/src/main/java/backend/academy/bot/controller/BotNotificationController.java index 4a9ed39..d8c9d20 100644 --- a/bot/src/main/java/backend/academy/bot/controller/BotNotificationController.java +++ b/bot/src/main/java/backend/academy/bot/controller/BotNotificationController.java @@ -1,7 +1,7 @@ package backend.academy.bot.controller; import backend.academy.bot.model.dto.NotificationRequest; -import backend.academy.bot.services.NotificationService; +import backend.academy.bot.service.notification.HttpNotificationService; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.responses.ApiResponse; import io.swagger.v3.oas.annotations.responses.ApiResponses; @@ -21,9 +21,9 @@ @Validated public class BotNotificationController { private static final Logger log = LoggerFactory.getLogger(BotNotificationController.class); - private final NotificationService notificationService; + private final HttpNotificationService notificationService; - public BotNotificationController(NotificationService notificationService) { + public BotNotificationController(HttpNotificationService notificationService) { this.notificationService = notificationService; } diff --git a/bot/src/main/java/backend/academy/bot/exception/WrongCommandException.java b/bot/src/main/java/backend/academy/bot/exception/WrongCommandException.java new file mode 100644 index 0000000..a7ee56c --- /dev/null +++ b/bot/src/main/java/backend/academy/bot/exception/WrongCommandException.java @@ -0,0 +1,20 @@ +package backend.academy.bot.exception; + +public class WrongCommandException extends Exception { + + public WrongCommandException() { + super(); + } + + public WrongCommandException(String message) { + super(message); + } + + public WrongCommandException(String message, Throwable cause) { + super(message, cause); + } + + public WrongCommandException(Throwable cause) { + super(cause); + } +} diff --git a/bot/src/main/java/backend/academy/bot/model/dto/NotificationRequest.java b/bot/src/main/java/backend/academy/bot/model/dto/NotificationRequest.java index 906277f..64da714 100644 --- a/bot/src/main/java/backend/academy/bot/model/dto/NotificationRequest.java +++ b/bot/src/main/java/backend/academy/bot/model/dto/NotificationRequest.java @@ -2,7 +2,7 @@ public class NotificationRequest { private String message; - private long userId; + private Long userId; public NotificationRequest() {} diff --git a/bot/src/main/java/backend/academy/bot/model/dto/UserRegistrationRequest.java b/bot/src/main/java/backend/academy/bot/model/dto/UserRegistrationRequest.java index b088c8f..a5bc90f 100644 --- a/bot/src/main/java/backend/academy/bot/model/dto/UserRegistrationRequest.java +++ b/bot/src/main/java/backend/academy/bot/model/dto/UserRegistrationRequest.java @@ -1,6 +1,6 @@ package backend.academy.bot.model.dto; -import backend.academy.bot.model.entities.User; +import backend.academy.bot.model.entity.User; public class UserRegistrationRequest { private long userId; diff --git a/bot/src/main/java/backend/academy/bot/model/entities/Filter.java b/bot/src/main/java/backend/academy/bot/model/entity/Filter.java similarity index 93% rename from bot/src/main/java/backend/academy/bot/model/entities/Filter.java rename to bot/src/main/java/backend/academy/bot/model/entity/Filter.java index a3c0f40..98d01f4 100644 --- a/bot/src/main/java/backend/academy/bot/model/entities/Filter.java +++ b/bot/src/main/java/backend/academy/bot/model/entity/Filter.java @@ -1,4 +1,4 @@ -package backend.academy.bot.model.entities; +package backend.academy.bot.model.entity; import java.util.Set; diff --git a/bot/src/main/java/backend/academy/bot/model/entities/Link.java b/bot/src/main/java/backend/academy/bot/model/entity/Link.java similarity index 96% rename from bot/src/main/java/backend/academy/bot/model/entities/Link.java rename to bot/src/main/java/backend/academy/bot/model/entity/Link.java index 1279680..dcbbba7 100644 --- a/bot/src/main/java/backend/academy/bot/model/entities/Link.java +++ b/bot/src/main/java/backend/academy/bot/model/entity/Link.java @@ -1,4 +1,4 @@ -package backend.academy.bot.model.entities; +package backend.academy.bot.model.entity; import java.util.Set; diff --git a/bot/src/main/java/backend/academy/bot/model/entities/Tag.java b/bot/src/main/java/backend/academy/bot/model/entity/Tag.java similarity index 95% rename from bot/src/main/java/backend/academy/bot/model/entities/Tag.java rename to bot/src/main/java/backend/academy/bot/model/entity/Tag.java index ff33d88..f6cfbd3 100644 --- a/bot/src/main/java/backend/academy/bot/model/entities/Tag.java +++ b/bot/src/main/java/backend/academy/bot/model/entity/Tag.java @@ -1,4 +1,4 @@ -package backend.academy.bot.model.entities; +package backend.academy.bot.model.entity; import java.util.HashSet; import java.util.Set; diff --git a/bot/src/main/java/backend/academy/bot/model/entities/User.java b/bot/src/main/java/backend/academy/bot/model/entity/User.java similarity index 95% rename from bot/src/main/java/backend/academy/bot/model/entities/User.java rename to bot/src/main/java/backend/academy/bot/model/entity/User.java index 1898b2d..87e7e2d 100644 --- a/bot/src/main/java/backend/academy/bot/model/entities/User.java +++ b/bot/src/main/java/backend/academy/bot/model/entity/User.java @@ -1,4 +1,4 @@ -package backend.academy.bot.model.entities; +package backend.academy.bot.model.entity; import java.util.List; diff --git a/bot/src/main/java/backend/academy/bot/service/LinkCacheService.java b/bot/src/main/java/backend/academy/bot/service/LinkCacheService.java new file mode 100644 index 0000000..fc481c5 --- /dev/null +++ b/bot/src/main/java/backend/academy/bot/service/LinkCacheService.java @@ -0,0 +1,44 @@ +package backend.academy.bot.service; + +import backend.academy.bot.client.ScrapperClient; +import backend.academy.bot.model.entity.Link; +import java.time.Duration; +import java.util.List; +import org.springframework.data.redis.core.ReactiveRedisOperations; +import org.springframework.stereotype.Service; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +@Service +public class LinkCacheService { + private static final String KEY_PREFIX = "links:"; + + private static final Duration TTL = Duration.ofMinutes(5); + private final ReactiveRedisOperations redisOps; + private final ScrapperClient scrapperClient; + + public LinkCacheService(ReactiveRedisOperations redisOps, ScrapperClient scrapperClient) { + this.redisOps = redisOps; + this.scrapperClient = scrapperClient; + } + + public Mono invalidateCache(Long userId) { + String key = KEY_PREFIX + userId; + return redisOps.delete(key).hasElement(); + } + + public Mono> getLinks(Long userId) { + String key = "links:" + userId; + return redisOps.opsForList().range(key, 0, -1).collectList().flatMap(list -> { + if (!list.isEmpty()) { + return Mono.just(list); + } + return scrapperClient + .getLinksByUserId(userId) + .flatMapMany(Flux::fromIterable) + .flatMap(link -> redisOps.opsForList().rightPush(key, link).thenReturn(link)) + .collectList() + .flatMap(freshList -> redisOps.expire(key, TTL).thenReturn(freshList)); + }); + } +} diff --git a/bot/src/main/java/backend/academy/bot/service/notification/FallbackNotificationService.java b/bot/src/main/java/backend/academy/bot/service/notification/FallbackNotificationService.java new file mode 100644 index 0000000..d5dd170 --- /dev/null +++ b/bot/src/main/java/backend/academy/bot/service/notification/FallbackNotificationService.java @@ -0,0 +1,46 @@ +package backend.academy.bot.service.notification; + +import io.github.resilience4j.circuitbreaker.CircuitBreaker; +import io.github.resilience4j.reactor.circuitbreaker.operator.CircuitBreakerOperator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Primary; +import org.springframework.stereotype.Service; +import reactor.core.publisher.Mono; + +@Service +@Primary +public class FallbackNotificationService implements NotificationService { + private final NotificationService primary; + private final NotificationService secondary; + private final CircuitBreaker notificationCb; + private static final Logger log = LoggerFactory.getLogger(FallbackNotificationService.class); + + public FallbackNotificationService( + @Qualifier(HttpNotificationService.BEAN_NAME) NotificationService http, + @Qualifier(KafkaConsumerNotificationService.BEAN_NAME) NotificationService kafka, + @Value("${app.message-transport:http}") String primaryTransport, + @Qualifier("notificationCb") CircuitBreaker notificationCb) { + this.notificationCb = notificationCb; + if ("kafka".equalsIgnoreCase(primaryTransport)) { + this.primary = kafka; + this.secondary = http; + } else { + this.primary = http; + this.secondary = kafka; + } + } + + @Override + public Mono notify(String msg, long user) { + return primary.notify(msg, user) + .transformDeferred(CircuitBreakerOperator.of(notificationCb)) + .onErrorResume(ex -> { + log.warn("Первичный транспорт упал - fallback. Причина: {}", ex.getMessage()); + return secondary.notify(msg, user); + }); + } +} + diff --git a/bot/src/main/java/backend/academy/bot/service/notification/HttpNotificationService.java b/bot/src/main/java/backend/academy/bot/service/notification/HttpNotificationService.java new file mode 100644 index 0000000..9821df2 --- /dev/null +++ b/bot/src/main/java/backend/academy/bot/service/notification/HttpNotificationService.java @@ -0,0 +1,32 @@ +package backend.academy.bot.service.notification; + +import com.pengrad.telegrambot.TelegramBot; +import com.pengrad.telegrambot.request.SendMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +@Service(HttpNotificationService.BEAN_NAME) +public class HttpNotificationService implements NotificationService { + public static final String BEAN_NAME = "httpNotificationService"; + + private final TelegramBot telegramBot; + private static final Logger log = LoggerFactory.getLogger(HttpNotificationService.class); + + public HttpNotificationService(TelegramBot telegramBot) { + this.telegramBot = telegramBot; + } + + @Override + public Mono notify(String message, long userId) { + return Mono.fromCallable(() -> { + telegramBot.execute(new SendMessage(userId, message)); + return (Void) null; + }) + .doOnSuccess(v -> log.info("HTTP-уведомление доставлено: {}", message)) + .doOnError(e -> log.error("Ошибка HTTP-уведомления", e)) + .subscribeOn(Schedulers.boundedElastic()); + } +} diff --git a/bot/src/main/java/backend/academy/bot/service/notification/KafkaConsumerNotificationService.java b/bot/src/main/java/backend/academy/bot/service/notification/KafkaConsumerNotificationService.java new file mode 100644 index 0000000..a7eadbf --- /dev/null +++ b/bot/src/main/java/backend/academy/bot/service/notification/KafkaConsumerNotificationService.java @@ -0,0 +1,57 @@ +package backend.academy.bot.service.notification; + +import backend.academy.avro.NotificationRequest; +import com.pengrad.telegrambot.TelegramBot; +import com.pengrad.telegrambot.request.SendMessage; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.stereotype.Service; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +@Service(KafkaConsumerNotificationService.BEAN_NAME) +@ConditionalOnProperty(name = "app.message-transport", havingValue = "Kafka") +public class KafkaConsumerNotificationService implements NotificationService { + public static final String BEAN_NAME = "kafkaNotificationService"; + + private static final Logger log = LoggerFactory.getLogger(KafkaConsumerNotificationService.class); + private final TelegramBot telegramBot; + + public KafkaConsumerNotificationService(TelegramBot telegramBot) { + this.telegramBot = telegramBot; + } + + @KafkaListener( + topics = "${app.topics.input}", + groupId = "bot-group", + containerFactory = "kafkaListenerContainerFactory") + public void listen(List batch, Acknowledgment ack) { + for (var req : batch) { + process(req); + } + ack.acknowledge(); + } + + private void process(NotificationRequest req) { + if (req.getMessage() == null) { + throw new IllegalArgumentException("Message must not be null"); + } + + notify(req.getMessage(), req.getUserId()); + } + + @Override + public Mono notify(String message, long userId) { + return Mono.fromCallable(() -> { + telegramBot.execute(new SendMessage(userId, message)); + return (Void) null; + }) + .doOnSuccess(v -> log.info("Kafka-уведомление доставлено: {}", message)) + .doOnError(error -> log.error("Ошибка Kafka-уведомления", error)) + .subscribeOn(Schedulers.boundedElastic()); + } +} diff --git a/bot/src/main/java/backend/academy/bot/service/notification/NotificationService.java b/bot/src/main/java/backend/academy/bot/service/notification/NotificationService.java new file mode 100644 index 0000000..bae7511 --- /dev/null +++ b/bot/src/main/java/backend/academy/bot/service/notification/NotificationService.java @@ -0,0 +1,8 @@ +package backend.academy.bot.service.notification; + +import reactor.core.publisher.Mono; + +public interface NotificationService { + Mono notify(String message, long userId); +} + diff --git a/bot/src/main/java/backend/academy/bot/services/NotificationService.java b/bot/src/main/java/backend/academy/bot/services/NotificationService.java deleted file mode 100644 index 8cb393d..0000000 --- a/bot/src/main/java/backend/academy/bot/services/NotificationService.java +++ /dev/null @@ -1,36 +0,0 @@ -package backend.academy.bot.services; - -import com.pengrad.telegrambot.TelegramBot; -import com.pengrad.telegrambot.request.SendMessage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Service; - -@Service -public class NotificationService { - private final TelegramBot telegramBot; - private static final Logger log = LoggerFactory.getLogger(NotificationService.class); - - public NotificationService(TelegramBot telegramBot) { - this.telegramBot = telegramBot; - } - - public void notify(String message, long userId) { - SendMessage sendMessage = new SendMessage(userId, message); - try { - telegramBot.execute(sendMessage); - log.info( - "Уведомление успешно отправлено: действие={}, сообщение={}, id пользователя={}", - "отправлено", - message, - userId); - } catch (Exception e) { - log.error( - "Ошибка при отправке уведомления: действие={}, сообщение={}, id пользователя={}, ошибка={}", - "отправка", - message, - userId, - e.getMessage()); - } - } -} diff --git a/bot/src/main/java/backend/academy/bot/telegram/TelegramPollingService.java b/bot/src/main/java/backend/academy/bot/telegram/TelegramPollingService.java index ac8363f..6ffb672 100644 --- a/bot/src/main/java/backend/academy/bot/telegram/TelegramPollingService.java +++ b/bot/src/main/java/backend/academy/bot/telegram/TelegramPollingService.java @@ -39,11 +39,7 @@ public void pollUpdates() { try { GetUpdates getUpdates = new GetUpdates().limit(LIMIT).offset(lastUpdateId + 1).timeout(TIMEOUT); - // log.info( - // "Отправляем запрос обновлений: limit={}, offset={}, timeout={}", - // LIMIT, - // lastUpdateId + 1, - // TIMEOUT); + GetUpdatesResponse updatesResponse = telegramBot.execute(getUpdates); List updates = updatesResponse.updates(); diff --git a/bot/src/main/resources/application.properties b/bot/src/main/resources/application.properties index a50fab3..b145ee0 100644 --- a/bot/src/main/resources/application.properties +++ b/bot/src/main/resources/application.properties @@ -1,3 +1,4 @@ scrapper.base-url=http://localhost:8081 logging.structured.format.console=ecs - +app.message-transport=Kafka +#Kafka/HTTP diff --git a/bot/src/main/resources/application.yaml b/bot/src/main/resources/application.yaml index cf8ce64..1d74035 100644 --- a/bot/src/main/resources/application.yaml +++ b/bot/src/main/resources/application.yaml @@ -1,7 +1,79 @@ app: - telegram-token: ${TELEGRAM_TOKEN} # env variable + telegram-token: ${TELEGRAM_TOKEN} + rate-limit: + defaults: + capacity: 50 + refill-tokens: 50 + period: 1m + error-code: 429 + error-message: "Too Many Requests" + per-endpoint: + "/api/bot/notify": + capacity: 100 + refill-tokens: 100 + period: 1m + error-code: 429 + error-message: "Too Many Requests on api/bot/notify" + + + + topics: + input: scrapperTopic + dead-letter: deadLetterTopic + listener: + concurrency: 1 + http: + connect-timeout: 5s + read-timeout: 10s + write-timeout: 10s + response-timeout: 12s + + retry: + attempts: 3 + backoff: 2s + status-codes: + - 502 + - 503 + - 504 + + circuit-breaker: + sliding-window-size: 1 + minimum-required-calls: 1 + failure-rate-threshold: 100 + permitted-calls-in-half-open-state: 1 + wait-duration-in-open-state: 1s + spring: + devtools: + restart: + enabled: false + redis: + host: ${REDIS_HOST:localhost} + port: ${REDIS_PORT:6379} + kafka: + bootstrap-servers: ${SPRING_KAFKA_BOOTSTRAP_SERVERS:localhost:9092} + listener: + ack-mode: manual + poll-timeout: 1s + concurrency: 1 + consumer: + enable-auto-commit: false + max-poll-records: 500 + isolation-level: read_committed + group-id: bot-group + auto-offset-reset: earliest + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer + properties: + specific.avro.reader: true + schema.registry.url: http://localhost:8082 + producer: + properties: + specific.avro.reader: true + schema.registry.url: http://localhost:8082 + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer application: name: Bot liquibase: diff --git a/bot/src/test/java/backend/academy/bot/CircuitBreakerTest.java b/bot/src/test/java/backend/academy/bot/CircuitBreakerTest.java new file mode 100644 index 0000000..dacf0ca --- /dev/null +++ b/bot/src/test/java/backend/academy/bot/CircuitBreakerTest.java @@ -0,0 +1,84 @@ +package backend.academy.bot; + +import static com.github.tomakehurst.wiremock.client.WireMock.*; +import static org.junit.jupiter.api.Assertions.*; + +import java.time.Duration; + +import backend.academy.bot.config.CircuitBreakerProps; +import backend.academy.bot.config.HttpTimeoutProps; +import backend.academy.bot.config.RetryProps; +import com.github.tomakehurst.wiremock.WireMockServer; +import io.github.resilience4j.circuitbreaker.CircuitBreaker; +import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry; +import io.github.resilience4j.circuitbreaker.CallNotPermittedException; +import io.github.resilience4j.circuitbreaker.autoconfigure.CircuitBreakerAutoConfiguration; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.cloud.contract.wiremock.AutoConfigureWireMock; +import org.springframework.web.reactive.function.client.WebClient; + +@SpringBootTest( + classes = { + BotConfig.class, + CircuitBreakerAutoConfiguration.class, // если используете авто-конфиг Resilience4j + HttpTimeoutProps.class, + RetryProps.class, + CircuitBreakerProps.class + }, + properties = { + "app.telegramToken=dummy", + } +) +@AutoConfigureWireMock(port = 0) +class CircuitBreakerTest { + + private static final String PATH = "/delayed"; + + @Autowired + private WebClient webClient; + + @Autowired + private WireMockServer wireMockServer; + + @Autowired + private CircuitBreakerRegistry cbRegistry; + + private CircuitBreaker cb; + + @BeforeEach + void setUp() { + cb = cbRegistry.circuitBreaker("backendServiceCb"); + cb.getEventPublisher().onStateTransition(evt -> { }); + cb.transitionToOpenState(); + } + + @Test + void whenCircuitIsOpen_thenRequestShortCircuitsBeforeHttpTimeout() { + stubFor(get(urlEqualTo(PATH)) + .willReturn(aResponse() + .withStatus(200) + .withFixedDelay(5_000))); + + long start = System.currentTimeMillis(); + + CallNotPermittedException ex = assertThrows(CallNotPermittedException.class, () -> + webClient.get() + .uri("http://localhost:{port}" + PATH, wireMockServer.port()) + .retrieve() + .bodyToMono(String.class) + .block(Duration.ofSeconds(10)) + ); + + long elapsed = System.currentTimeMillis() - start; + + assertTrue(elapsed < 1_000, + "CircuitBreaker должен коротить запрос сразу, а не ждать 5-секундного ответа; вместо " + elapsed + " мс"); + + assertEquals(CircuitBreaker.State.OPEN, cb.getState()); + } +} + diff --git a/bot/src/test/java/backend/academy/bot/KafkaConsumerNotificationServiceDlqTest.java b/bot/src/test/java/backend/academy/bot/KafkaConsumerNotificationServiceDlqTest.java new file mode 100644 index 0000000..3191767 --- /dev/null +++ b/bot/src/test/java/backend/academy/bot/KafkaConsumerNotificationServiceDlqTest.java @@ -0,0 +1,118 @@ +package backend.academy.bot; + +import backend.academy.avro.NotificationRequest; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; +import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import io.confluent.kafka.serializers.KafkaAvroSerializer; +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; +import static org.junit.jupiter.api.Assertions.assertAll; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +@SpringBootTest +@Testcontainers +@Disabled +class KafkaConsumerNotificationServiceDlqTest { + + private static final String TOPIC = "scrapperTopic"; + private static final String DLQ_TOPIC = "deadLetterTopic"; + private static final String MOCK_SCHEMA_REGISTRY = "mock://bot-tests"; + + @Container + static KafkaContainer kafka = + new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.5.0")); + + @DynamicPropertySource + static void springKafkaProps(DynamicPropertyRegistry reg) { + reg.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers); + reg.add("app.topics.input", () -> TOPIC); + reg.add("app.topics.dead-letter", () -> DLQ_TOPIC); + + reg.add("spring.kafka.producer.value-serializer", + () -> KafkaAvroSerializer.class.getName()); + reg.add("spring.kafka.producer.properties." + + AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, + () -> MOCK_SCHEMA_REGISTRY); + + reg.add("spring.kafka.consumer.value-deserializer", + () -> KafkaAvroDeserializer.class.getName()); + reg.add("spring.kafka.consumer.properties." + + AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, + () -> MOCK_SCHEMA_REGISTRY); + reg.add("spring.kafka.consumer.properties.specific.avro.reader", () -> "true"); + } + + private Consumer consumer; + + @BeforeEach + void setUp() { + Map props = + KafkaTestUtils.consumerProps(kafka.getBootstrapServers(), "test-group", "true"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class); + props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, MOCK_SCHEMA_REGISTRY); + props.put("specific.avro.reader", true); + + consumer = new DefaultKafkaConsumerFactory(props).createConsumer(); + consumer.subscribe(List.of(DLQ_TOPIC)); + } + + @AfterEach + void tearDown() { + consumer.close(); + } + + @Test + void whenNullField_thenRecordAppearsInDlq() { + NotificationRequest bad = NotificationRequest.newBuilder() + .setUserId(123L) + .setMessage(null) + .build(); + + Map prodCfg = new HashMap<>(); + prodCfg.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); + prodCfg.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + prodCfg.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); + prodCfg.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, MOCK_SCHEMA_REGISTRY); + + try (KafkaProducer producer = + new KafkaProducer<>(prodCfg)) { + producer.send(new org.apache.kafka.clients.producer.ProducerRecord<>(TOPIC, null, bad)); + producer.flush(); + } + + ConsumerRecord rec = + KafkaTestUtils.getSingleRecord(consumer, DLQ_TOPIC, Duration.ofSeconds(20)); + assertNotNull(rec, "Запись в DLQ не найдена"); + + NotificationRequest actual = rec.value(); + assertAll( + () -> assertEquals(bad.getUserId(), actual.getUserId()), + () -> assertNull(actual.getMessage(), "Поле message должно быть null") + ); + } +} diff --git a/bot/src/test/java/backend/academy/bot/KafkaConsumerNotificationServiceTest.java b/bot/src/test/java/backend/academy/bot/KafkaConsumerNotificationServiceTest.java new file mode 100644 index 0000000..3d87ab3 --- /dev/null +++ b/bot/src/test/java/backend/academy/bot/KafkaConsumerNotificationServiceTest.java @@ -0,0 +1,62 @@ +package backend.academy.bot; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +import backend.academy.avro.NotificationRequest; +import backend.academy.bot.service.notification.KafkaConsumerNotificationService; +import com.pengrad.telegrambot.TelegramBot; +import com.pengrad.telegrambot.request.SendMessage; +import java.util.List; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.springframework.kafka.support.Acknowledgment; + +class KafkaConsumerNotificationServiceTest { + + @Mock + private TelegramBot telegramBot; + + @Mock + private Acknowledgment acknowledgment; + + @InjectMocks + private KafkaConsumerNotificationService kafkaConsumerNotificationService; + + @BeforeEach + void setUp() { + MockitoAnnotations.openMocks(this); + } + + @Test + void shouldHandleExceptionWhenTelegramFails() { + NotificationRequest request = new NotificationRequest(); + request.setMessage("Test message"); + request.setUserId(123456789L); + + when(telegramBot.execute(any(SendMessage.class))).thenThrow(new RuntimeException("Telegram API error")); + + try { + kafkaConsumerNotificationService.listen(List.of(request), acknowledgment); + } catch (Exception ignored) { + } + + verify(telegramBot).execute(any(SendMessage.class)); + verify(acknowledgment, never()).acknowledge(); + } + + @Test + void shouldProcessValidNotificationAndAcknowledge() { + NotificationRequest request = new NotificationRequest(); + request.setMessage("Test message"); + request.setUserId(123456789L); + + kafkaConsumerNotificationService.listen(List.of(request), acknowledgment); + + verify(telegramBot).execute(any(SendMessage.class)); + verify(acknowledgment).acknowledge(); + } +} diff --git a/bot/src/test/java/backend/academy/bot/LinkCacheServiceTest.java b/bot/src/test/java/backend/academy/bot/LinkCacheServiceTest.java new file mode 100644 index 0000000..a98098a --- /dev/null +++ b/bot/src/test/java/backend/academy/bot/LinkCacheServiceTest.java @@ -0,0 +1,136 @@ +package backend.academy.bot; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.times; + +import backend.academy.bot.client.ScrapperClient; +import backend.academy.bot.model.entity.Link; +import backend.academy.bot.model.entity.User; +import backend.academy.bot.service.LinkCacheService; +import com.redis.testcontainers.RedisContainer; +import java.util.List; +import java.util.Set; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.mockito.Mockito; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.boot.testcontainers.service.connection.ServiceConnection; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Primary; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +@SpringBootTest +@Testcontainers +@EnableAutoConfiguration(exclude = KafkaAutoConfiguration.class) +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +class LinkCacheServiceTest { + + private static final long USER_ID = 42L; + + @Container + @ServiceConnection + static RedisContainer redis = new RedisContainer("redis:7.2.4"); + + @TestConfiguration + static class TestConfig { + + @Bean + @Primary + ScrapperClient scrapperClient() { + return Mockito.mock(ScrapperClient.class); + } + + @Bean + @Primary + KafkaTemplate kafkaTemplate() { + return Mockito.mock(KafkaTemplate.class); + } + + @Bean + @Primary + ConsumerFactory consumerFactory() { + return Mockito.mock(ConsumerFactory.class); + } + } + + @Autowired + private LinkCacheService linkCacheService; + + @Autowired + private ScrapperClient scrapperClient; + + private final List dummyLinks = List.of( + new Link("https://foo.bar/1", new User(123L, "123", List.of()), Set.of(), Set.of()), + new Link("https://foo.bar/2", new User(123L, "123", List.of()), Set.of(), Set.of())); + + @BeforeEach + void setUp() { + linkCacheService.invalidateCache(USER_ID).block(); + Mockito.reset(scrapperClient); + } + + @Test + void shouldFetchLinksAndCacheThem() { + Mockito.when(scrapperClient.getLinksByUserId(USER_ID)).thenReturn(Mono.just(dummyLinks)); + + StepVerifier.create(linkCacheService.getLinks(USER_ID)) + .assertNext(list -> assertThat(list) + .extracting(Link::getLink) + .containsExactly("https://foo.bar/1", "https://foo.bar/2")) + .verifyComplete(); + + Mockito.verify(scrapperClient, times(1)).getLinksByUserId(USER_ID); + } + + @Test + void shouldReturnCachedLinksOnSubsequentCalls() { + Mockito.when(scrapperClient.getLinksByUserId(USER_ID)).thenReturn(Mono.just(dummyLinks)); + + linkCacheService.getLinks(USER_ID).block(); + + StepVerifier.create(linkCacheService.getLinks(USER_ID)) + .assertNext(list -> assertThat(list) + .extracting(Link::getLink) + .containsExactly("https://foo.bar/1", "https://foo.bar/2")) + .verifyComplete(); + + Mockito.verify(scrapperClient, times(1)).getLinksByUserId(USER_ID); + } + + @Test + void shouldInvalidateEmptyCache() { + StepVerifier.create(linkCacheService.invalidateCache(USER_ID)) + .expectNext(true) + .verifyComplete(); + } + + @Test + void shouldFetchLinksAfterInvalidation() { + Mockito.when(scrapperClient.getLinksByUserId(USER_ID)).thenReturn(Mono.just(dummyLinks)); + linkCacheService.getLinks(USER_ID).block(); + Mockito.verify(scrapperClient, times(1)).getLinksByUserId(USER_ID); + + StepVerifier.create(linkCacheService.invalidateCache(USER_ID)) + .expectNext(true) + .verifyComplete(); + + Mockito.when(scrapperClient.getLinksByUserId(USER_ID)).thenReturn(Mono.just(dummyLinks)); + StepVerifier.create(linkCacheService.getLinks(USER_ID)) + .assertNext(list -> assertThat(list) + .extracting(Link::getLink) + .containsExactly("https://foo.bar/1", "https://foo.bar/2")) + .verifyComplete(); + + Mockito.verify(scrapperClient, times(2)).getLinksByUserId(USER_ID); + } +} diff --git a/bot/src/test/java/backend/academy/bot/NotificationRequestDeserializationTest.java b/bot/src/test/java/backend/academy/bot/NotificationRequestDeserializationTest.java new file mode 100644 index 0000000..ad87ab0 --- /dev/null +++ b/bot/src/test/java/backend/academy/bot/NotificationRequestDeserializationTest.java @@ -0,0 +1,58 @@ +package backend.academy.bot; + +import static org.junit.jupiter.api.Assertions.*; + +import backend.academy.bot.model.dto.NotificationRequest; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.stream.Stream; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +class NotificationRequestDeserializationTest { + + private ObjectMapper objectMapper; + + @BeforeEach + void setUp() { + objectMapper = new ObjectMapper(); + } + + @ParameterizedTest + @MethodSource("jsonDtoProvider") + void shouldDeserializeJson( + String json, String expectedMessage, long expectedUserId, boolean ignoreUnknownProperties) + throws Exception { + if (ignoreUnknownProperties) { + objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + } + NotificationRequest dto = objectMapper.readValue(json, NotificationRequest.class); + + assertNotNull(dto, "DTO не должен быть null"); + assertEquals(expectedMessage, dto.getMessage()); + assertEquals(expectedUserId, dto.getUserId()); + } + + private static Stream jsonDtoProvider() { + String fullJson = + """ + { + "message": "Hello!", + "userId": 123456789 + } + """; + String extraFieldJson = + """ + { + "message": "Test", + "userId": 1, + "extraField": "value" + } + """; + + return Stream.of( + org.junit.jupiter.params.provider.Arguments.of(fullJson, "Hello!", 123456789L, false), + org.junit.jupiter.params.provider.Arguments.of(extraFieldJson, "Test", 1L, true)); + } +} diff --git a/docker-compose.yaml b/docker-compose.yaml index fd5eadf..f73db77 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -13,6 +13,14 @@ services: volumes: - pgdata:/var/lib/postgresql/data + redis: + image: redis:7.0-alpine + restart: always + ports: + - "6379:6379" + volumes: + - redisdata:/data + liquibase: image: liquibase/liquibase depends_on: @@ -23,6 +31,49 @@ services: sh -c "liquibase --url=jdbc:postgresql://postgres:5432/${POSTGRES_DB} --username=${LIQUIBASE_USERNAME} --password=${LIQUIBASE_PASSWORD} --changeLogFile=changelog/changelog.sql clearCheckSums && liquibase --url=jdbc:postgresql://postgres:5432/${POSTGRES_DB} --username=${LIQUIBASE_USERNAME} --password=${LIQUIBASE_PASSWORD} --changeLogFile=changelog/changelog.sql update" + zookeeper: + image: confluentinc/cp-zookeeper:7.6.0 + restart: always + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + ports: + - "2181:2181" + + kafka: + image: confluentinc/cp-kafka:7.6.0 + restart: always + depends_on: + - zookeeper + ports: + - "9092:9092" + - "29092:29092" + environment: + KAFKA_LISTENERS: > + PLAINTEXT://0.0.0.0:9092, + PLAINTEXT_INTERNAL://0.0.0.0:29092 + KAFKA_ADVERTISED_LISTENERS: > + PLAINTEXT://localhost:9092, + PLAINTEXT_INTERNAL://kafka:29092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT_INTERNAL + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + schema-registry: + image: confluentinc/cp-schema-registry:7.6.0 + restart: always + depends_on: + - kafka + - zookeeper + ports: + - "8082:8082" + environment: + SCHEMA_REGISTRY_HOST_NAME: schema-registry + SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8082 + SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:29092 + SCHEMA_REGISTRY_KAFKASTORE_TOPIC_REPLICATION_FACTOR: 1 volumes: + redisdata: pgdata: diff --git a/pom.xml b/pom.xml index 5260f7e..1d1666f 100644 --- a/pom.xml +++ b/pom.xml @@ -1,7 +1,6 @@ 4.0.0 - org.springframework.boot spring-boot-starter-parent @@ -12,10 +11,11 @@ backend.academy root - ${revision} + 1.0 pom + avro-schemas bot report scrapper @@ -23,6 +23,7 @@ 1.0 + 2.14.0 3.8.8 @@ -68,6 +69,29 @@ + + org.springframework.boot + spring-boot-dependencies + 3.4.2 + pom + import + + + com.google.errorprone + error_prone_annotations + 2.28.0 + + + org.checkerframework + checker-qual + 3.48.3 + + + org.apache.avro + avro + 1.11.3 + provided + org.jspecify jspecify @@ -89,14 +113,13 @@ annotations ${jetbrains-annotations.version} - org.apache.commons commons-compress ${commons-compress.version} - org.apache.commons + commons-io commons-io ${commons-io.version} @@ -131,6 +154,15 @@ + + org.apache.avro + avro + + + io.confluent + kafka-avro-serializer + 7.6.0 + org.jspecify jspecify @@ -168,9 +200,34 @@ + + + confluent + Confluent Maven Repo + https://packages.confluent.io/maven/ + + + + + org.apache.avro + avro-maven-plugin + 1.11.3 + + + + schema + + generate-sources + + ${project.basedir}/src/main/avro + String + + + + org.apache.maven.plugins maven-compiler-plugin diff --git a/report/pom.xml b/report/pom.xml index 773e801..ba729fc 100644 --- a/report/pom.xml +++ b/report/pom.xml @@ -5,21 +5,29 @@ backend.academy root - ${revision} + 1.0 report - + + + + commons-io + commons-io + 2.16.1 + + + backend.academy bot - ${revision} + 1.0 backend.academy scrapper - ${revision} + 1.0 diff --git a/scrapper/pom.xml b/scrapper/pom.xml index 9d20e52..29b604e 100644 --- a/scrapper/pom.xml +++ b/scrapper/pom.xml @@ -5,12 +5,44 @@ backend.academy root - ${revision} + 1.0 + ../pom.xml scrapper + + 2.18.0 + 1.11.3 + + + + io.github.resilience4j + resilience4j-spring-boot2 + 1.7.1 + + + io.github.resilience4j + resilience4j-reactor + 1.7.1 + + + com.bucket4j + bucket4j_jdk17-core + 8.14.0 + + + backend.academy + avro-schemas + 1.0 + + + org.apache.avro + avro + ${avro.version} + compile + org.springframework.boot spring-boot-starter-web @@ -62,10 +94,10 @@ - - - - + + org.springframework.kafka + spring-kafka + @@ -147,11 +179,15 @@ kafka test - - - - - + + org.springframework.kafka + spring-kafka-test + test + + + commons-io + commons-io + diff --git a/scrapper/scrapper.Dockerfile b/scrapper/scrapper.Dockerfile new file mode 100644 index 0000000..5797152 --- /dev/null +++ b/scrapper/scrapper.Dockerfile @@ -0,0 +1,11 @@ +FROM maven:3.8.8-eclipse-temurin-21 AS build +WORKDIR /workspace +COPY scrapper/pom.xml scrapper/pom.xml +COPY scrapper/src scrapper/src +RUN mvn -f scrapper/pom.xml clean package -DskipTests + +FROM eclipse-temurin:21-jre-alpine +WORKDIR /app +COPY --from=build /workspace/scrapper/target/scrapper.jar ./scrapper.jar +EXPOSE 8080 +ENTRYPOINT ["java", "-jar", "/app/scrapper.jar"] diff --git a/scrapper/src/main/java/backend/academy/scrapper/ScrapperConfig.java b/scrapper/src/main/java/backend/academy/scrapper/ScrapperConfig.java index b999760..bad00c2 100644 --- a/scrapper/src/main/java/backend/academy/scrapper/ScrapperConfig.java +++ b/scrapper/src/main/java/backend/academy/scrapper/ScrapperConfig.java @@ -1,11 +1,112 @@ package backend.academy.scrapper; +import backend.academy.scrapper.config.CircuitBreakerProps; +import backend.academy.scrapper.config.HttpTimeoutProps; +import backend.academy.scrapper.config.RetryProps; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.github.resilience4j.circuitbreaker.CircuitBreaker; +import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig; +import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry; +import io.github.resilience4j.reactor.circuitbreaker.operator.CircuitBreakerOperator; +import io.netty.channel.ChannelOption; +import io.netty.handler.timeout.ReadTimeoutHandler; +import io.netty.handler.timeout.WriteTimeoutHandler; import jakarta.validation.constraints.NotEmpty; +import java.util.List; +import java.util.concurrent.TimeUnit; import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.boot.context.properties.ConfigurationPropertiesScan; +import org.springframework.context.annotation.Bean; +import org.springframework.http.HttpMethod; +import org.springframework.http.client.reactive.ReactorClientHttpConnector; import org.springframework.validation.annotation.Validated; +import org.springframework.web.reactive.function.client.ClientRequest; +import org.springframework.web.reactive.function.client.ClientResponse; +import org.springframework.web.reactive.function.client.ExchangeFilterFunction; +import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.reactive.function.client.WebClientResponseException; +import reactor.core.publisher.Mono; +import reactor.netty.http.client.HttpClient; +import reactor.util.retry.Retry; @Validated -@ConfigurationProperties(prefix = "app", ignoreUnknownFields = false) +@ConfigurationProperties(prefix = "app", ignoreUnknownFields = true) +@ConfigurationPropertiesScan public record ScrapperConfig(@NotEmpty String githubToken, StackOverflowCredentials stackOverflow) { - public record StackOverflowCredentials(@NotEmpty String key, @NotEmpty String accessToken) {} + public record StackOverflowCredentials(@NotEmpty String key, @NotEmpty String accessToken) { + } + + @Bean + public ObjectMapper objectMapper() { + return new ObjectMapper(); + } + + @Bean + public CircuitBreaker circuitBreaker(CircuitBreakerProps props) { + CircuitBreakerConfig cfg = CircuitBreakerConfig.custom() + .slidingWindowType(CircuitBreakerConfig.SlidingWindowType.COUNT_BASED) + .slidingWindowSize(props.slidingWindowSize()) + .minimumNumberOfCalls(props.minimumRequiredCalls()) + .failureRateThreshold(props.failureRateThreshold()) + .permittedNumberOfCallsInHalfOpenState(props.permittedCallsInHalfOpenState()) + .waitDurationInOpenState(props.waitDurationInOpenState()) + .recordExceptions(Throwable.class) + .build(); + + return CircuitBreaker.of("backendServiceCb", cfg); + } + + @Bean(name = "notificationCb") + public CircuitBreaker notificationCircuitBreaker(CircuitBreakerRegistry registry) { + return registry.circuitBreaker("notificationCb"); + } + + @Bean + public WebClient webClient(HttpTimeoutProps t, RetryProps retryProps, CircuitBreakerProps circuitBreakerProps) { + HttpClient http = HttpClient.create() + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, + (int) t.connectTimeout().toMillis()) + .responseTimeout(t.responseTimeout()) + .doOnConnected(conn -> conn + .addHandlerLast(new ReadTimeoutHandler( + t.readTimeout().toMillis(), TimeUnit.MILLISECONDS)) + .addHandlerLast(new WriteTimeoutHandler( + t.writeTimeout().toMillis(), TimeUnit.MILLISECONDS))); + + + ExchangeFilterFunction cbFilter = (request, next) -> + next.exchange(request) + .transformDeferred(CircuitBreakerOperator.of(circuitBreaker(circuitBreakerProps))); + + ExchangeFilterFunction retryFilter = (request, next) -> next.exchange(request) + .flatMap(response -> { + if (shouldRetry(request, response, retryProps)) { + return response.createException() + .flatMap(Mono::error); + } + return Mono.just(response); + }) + .retryWhen(Retry.fixedDelay(retryProps.attempts(), retryProps.backoff()) + .filter(ex -> isRetryable(ex, retryProps))); + + return WebClient.builder() + .clientConnector(new ReactorClientHttpConnector(http)) + .filter(cbFilter) + .filter(retryFilter) + .build(); + } + + private boolean shouldRetry(ClientRequest req, ClientResponse resp, RetryProps rp) { + boolean codeMatch = rp.statusCodes().contains(resp.statusCode().value()); + boolean methodOk = req.method() != null && + List.of(HttpMethod.GET, HttpMethod.HEAD, HttpMethod.PUT, + HttpMethod.DELETE, HttpMethod.OPTIONS) + .contains(req.method()); + return codeMatch && methodOk; + } + + private boolean isRetryable(Throwable ex, RetryProps rp) { + return ex instanceof WebClientResponseException wce && + rp.statusCodes().contains(wce.getRawStatusCode()); + } } diff --git a/scrapper/src/main/java/backend/academy/scrapper/config/CircuitBreakerProps.java b/scrapper/src/main/java/backend/academy/scrapper/config/CircuitBreakerProps.java new file mode 100644 index 0000000..912263d --- /dev/null +++ b/scrapper/src/main/java/backend/academy/scrapper/config/CircuitBreakerProps.java @@ -0,0 +1,17 @@ +package backend.academy.scrapper.config; + +import jakarta.validation.constraints.Min; +import jakarta.validation.constraints.NotNull; +import java.time.Duration; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.validation.annotation.Validated; + +@Validated +@ConfigurationProperties(prefix = "app.http.circuit-breaker") +public record CircuitBreakerProps( + @Min(1) int slidingWindowSize, + @Min(1) int minimumRequiredCalls, + @Min(1) int failureRateThreshold, + @Min(1) int permittedCallsInHalfOpenState, + @NotNull Duration waitDurationInOpenState) { } + diff --git a/scrapper/src/main/java/backend/academy/scrapper/config/HttpTimeoutProps.java b/scrapper/src/main/java/backend/academy/scrapper/config/HttpTimeoutProps.java new file mode 100644 index 0000000..e41013a --- /dev/null +++ b/scrapper/src/main/java/backend/academy/scrapper/config/HttpTimeoutProps.java @@ -0,0 +1,15 @@ +package backend.academy.scrapper.config; + +import jakarta.validation.constraints.NotNull; +import java.time.Duration; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.validation.annotation.Validated; + +@Validated +@ConfigurationProperties(prefix = "app.http") +public record HttpTimeoutProps( + @NotNull Duration connectTimeout, + @NotNull Duration readTimeout, + @NotNull Duration writeTimeout, + @NotNull Duration responseTimeout) { } + diff --git a/scrapper/src/main/java/backend/academy/scrapper/config/RateLimitConfig.java b/scrapper/src/main/java/backend/academy/scrapper/config/RateLimitConfig.java new file mode 100644 index 0000000..e2148f2 --- /dev/null +++ b/scrapper/src/main/java/backend/academy/scrapper/config/RateLimitConfig.java @@ -0,0 +1,80 @@ +package backend.academy.scrapper.config; + +import io.github.bucket4j.Bandwidth; +import io.github.bucket4j.Bucket; +import io.github.bucket4j.Refill; +import jakarta.servlet.FilterChain; +import jakarta.servlet.ServletException; +import jakarta.servlet.http.HttpServletRequest; +import jakarta.servlet.http.HttpServletResponse; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.boot.web.servlet.FilterRegistrationBean; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.Ordered; +import org.springframework.web.filter.OncePerRequestFilter; + +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +@Configuration +@EnableConfigurationProperties(RateLimitProps.class) +public class RateLimitConfig { + + private final RateLimitProps props; + private final ConcurrentMap buckets = new ConcurrentHashMap<>(); + + public RateLimitConfig(RateLimitProps props) { + this.props = props; + } + + @Bean + public FilterRegistrationBean rateLimitFilter() { + OncePerRequestFilter filter = new OncePerRequestFilter() { + @Override + protected void doFilterInternal( + HttpServletRequest req, HttpServletResponse res, FilterChain chain) + throws IOException, ServletException { + + String ip = extractClientIp(req); + String path = req.getRequestURI(); + String key = ip + "::" + path; + + Bucket bucket = buckets.computeIfAbsent(key, k -> newBucketFor(path)); + if (bucket.tryConsume(1)) { + chain.doFilter(req, res); + } else { + RateLimitProps.RateLimitSettings s = settingsFor(path); + res.setStatus(s.getErrorCode()); + res.getWriter().write(s.getErrorMessage()); + } + } + }; + + FilterRegistrationBean reg = new FilterRegistrationBean<>(filter); + reg.setOrder(Ordered.HIGHEST_PRECEDENCE); + // при необходимости можно ограничить urlPatterns: + // reg.addUrlPatterns("/api/*"); + return reg; + } + + private Bucket newBucketFor(String path) { + RateLimitProps.RateLimitSettings s = settingsFor(path); + Refill refill = Refill.intervally(s.getRefillTokens(), s.getPeriod()); + Bandwidth limit = Bandwidth.classic(s.getCapacity(), refill); + return Bucket.builder().addLimit(limit).build(); + } + + private RateLimitProps.RateLimitSettings settingsFor(String path) { + return props.getPerEndpoint().getOrDefault(path, props.getDefaults()); + } + + private String extractClientIp(HttpServletRequest req) { + String xf = req.getHeader("X-Forwarded-For"); + if (xf != null && !xf.isBlank()) { + return xf.split(",")[0].trim(); + } + return req.getRemoteAddr(); + } +} diff --git a/scrapper/src/main/java/backend/academy/scrapper/config/RateLimitProps.java b/scrapper/src/main/java/backend/academy/scrapper/config/RateLimitProps.java new file mode 100644 index 0000000..0fe46c3 --- /dev/null +++ b/scrapper/src/main/java/backend/academy/scrapper/config/RateLimitProps.java @@ -0,0 +1,63 @@ +package backend.academy.scrapper.config; + +import jakarta.validation.constraints.Min; +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.validation.annotation.Validated; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; + +@Validated +@ConfigurationProperties(prefix = "app.rate-limit") +public class RateLimitProps { + + @NotNull + private RateLimitSettings defaults; + + @NotNull + private Map perEndpoint = new HashMap<>(); + + public RateLimitSettings getDefaults() { + return defaults; + } + + public void setDefaults(RateLimitSettings defaults) { + this.defaults = defaults; + } + + public Map getPerEndpoint() { + return perEndpoint; + } + + public void setPerEndpoint(Map perEndpoint) { + this.perEndpoint = perEndpoint; + } + + @Validated + public static class RateLimitSettings { + @Min(1) + private int capacity; + @Min(1) + private int refillTokens; + @NotNull + private Duration period; + @Min(100) + private int errorCode; + @NotEmpty + private String errorMessage; + + public int getCapacity() { return capacity; } + public void setCapacity(int capacity) { this.capacity = capacity; } + public int getRefillTokens() { return refillTokens; } + public void setRefillTokens(int refillTokens) { this.refillTokens = refillTokens; } + public Duration getPeriod() { return period; } + public void setPeriod(Duration period) { this.period = period; } + public int getErrorCode() { return errorCode; } + public void setErrorCode(int errorCode) { this.errorCode = errorCode; } + public String getErrorMessage() { return errorMessage; } + public void setErrorMessage(String errorMessage) { this.errorMessage = errorMessage; } + } +} diff --git a/scrapper/src/main/java/backend/academy/scrapper/config/RetryProps.java b/scrapper/src/main/java/backend/academy/scrapper/config/RetryProps.java new file mode 100644 index 0000000..4475316 --- /dev/null +++ b/scrapper/src/main/java/backend/academy/scrapper/config/RetryProps.java @@ -0,0 +1,18 @@ +package backend.academy.scrapper.config; + +import jakarta.validation.constraints.Min; +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.validation.annotation.Validated; + +import java.time.Duration; +import java.util.List; + +@Validated +@ConfigurationProperties(prefix = "app.http.retry") +public record RetryProps( + @NotNull @Min(1) Integer attempts, + @NotNull Duration backoff, + @NotEmpty List statusCodes) { } + diff --git a/scrapper/src/main/java/backend/academy/scrapper/config/WebClientConfig.java b/scrapper/src/main/java/backend/academy/scrapper/config/WebClientConfig.java deleted file mode 100644 index e036240..0000000 --- a/scrapper/src/main/java/backend/academy/scrapper/config/WebClientConfig.java +++ /dev/null @@ -1,13 +0,0 @@ -package backend.academy.scrapper.config; - -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.web.reactive.function.client.WebClient; - -@Configuration -public class WebClientConfig { - @Bean - public WebClient webClient() { - return WebClient.builder().build(); - } -} diff --git a/scrapper/src/main/java/backend/academy/scrapper/controller/ListController.java b/scrapper/src/main/java/backend/academy/scrapper/controller/ListController.java index eb7ae23..92fb705 100644 --- a/scrapper/src/main/java/backend/academy/scrapper/controller/ListController.java +++ b/scrapper/src/main/java/backend/academy/scrapper/controller/ListController.java @@ -2,6 +2,7 @@ import backend.academy.scrapper.model.entities.Link; import backend.academy.scrapper.service.linkService.LinkService; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.responses.ApiResponse; import io.swagger.v3.oas.annotations.responses.ApiResponses; @@ -34,6 +35,10 @@ public ListController(LinkService linkService) { @ApiResponse(responseCode = "400", description = "Неверный формат запроса"), @ApiResponse(responseCode = "500", description = "Внутренняя ошибка сервера") }) + @SuppressFBWarnings( + value = "ENTITY_LEAK", + justification = + "Возвращаем Link напрямую, т.к. в нём нет служебных полей и он эквивалентен публичному DTO.") @GetMapping("/links") public Flux getLinks(@RequestParam("userId") long userId) { log.info("Получение списка ссылок для пользователя: {}", userId); diff --git a/scrapper/src/main/java/backend/academy/scrapper/exceptions/DataAccessException.java b/scrapper/src/main/java/backend/academy/scrapper/exception/DataAccessException.java similarity index 89% rename from scrapper/src/main/java/backend/academy/scrapper/exceptions/DataAccessException.java rename to scrapper/src/main/java/backend/academy/scrapper/exception/DataAccessException.java index 4c13694..c34b7f4 100644 --- a/scrapper/src/main/java/backend/academy/scrapper/exceptions/DataAccessException.java +++ b/scrapper/src/main/java/backend/academy/scrapper/exception/DataAccessException.java @@ -1,4 +1,4 @@ -package backend.academy.scrapper.exceptions; +package backend.academy.scrapper.exception; public class DataAccessException extends RuntimeException { diff --git a/scrapper/src/main/java/backend/academy/scrapper/exceptions/ResourceNotFoundException.java b/scrapper/src/main/java/backend/academy/scrapper/exception/ResourceNotFoundException.java similarity index 90% rename from scrapper/src/main/java/backend/academy/scrapper/exceptions/ResourceNotFoundException.java rename to scrapper/src/main/java/backend/academy/scrapper/exception/ResourceNotFoundException.java index 05bb6ff..1944ff5 100644 --- a/scrapper/src/main/java/backend/academy/scrapper/exceptions/ResourceNotFoundException.java +++ b/scrapper/src/main/java/backend/academy/scrapper/exception/ResourceNotFoundException.java @@ -1,4 +1,4 @@ -package backend.academy.scrapper.exceptions; +package backend.academy.scrapper.exception; public class ResourceNotFoundException extends RuntimeException { diff --git a/scrapper/src/main/java/backend/academy/scrapper/exceptions/UserAlreadyExistsException.java b/scrapper/src/main/java/backend/academy/scrapper/exception/UserAlreadyExistsException.java similarity index 90% rename from scrapper/src/main/java/backend/academy/scrapper/exceptions/UserAlreadyExistsException.java rename to scrapper/src/main/java/backend/academy/scrapper/exception/UserAlreadyExistsException.java index ccbf225..54d6a3f 100644 --- a/scrapper/src/main/java/backend/academy/scrapper/exceptions/UserAlreadyExistsException.java +++ b/scrapper/src/main/java/backend/academy/scrapper/exception/UserAlreadyExistsException.java @@ -1,4 +1,4 @@ -package backend.academy.scrapper.exceptions; +package backend.academy.scrapper.exception; public class UserAlreadyExistsException extends RuntimeException { public UserAlreadyExistsException() { diff --git a/scrapper/src/main/java/backend/academy/scrapper/exceptionHandler/DataAccessExceptionHandler.java b/scrapper/src/main/java/backend/academy/scrapper/exceptionHandler/DataAccessExceptionHandler.java index 7538949..1b65348 100644 --- a/scrapper/src/main/java/backend/academy/scrapper/exceptionHandler/DataAccessExceptionHandler.java +++ b/scrapper/src/main/java/backend/academy/scrapper/exceptionHandler/DataAccessExceptionHandler.java @@ -1,6 +1,6 @@ package backend.academy.scrapper.exceptionHandler; -import backend.academy.scrapper.exceptions.ResourceNotFoundException; +import backend.academy.scrapper.exception.ResourceNotFoundException; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.ExceptionHandler; diff --git a/scrapper/src/main/java/backend/academy/scrapper/exceptionHandler/UserExceptionHandler.java b/scrapper/src/main/java/backend/academy/scrapper/exceptionHandler/UserExceptionHandler.java index 7415d6a..098bf26 100644 --- a/scrapper/src/main/java/backend/academy/scrapper/exceptionHandler/UserExceptionHandler.java +++ b/scrapper/src/main/java/backend/academy/scrapper/exceptionHandler/UserExceptionHandler.java @@ -1,6 +1,6 @@ package backend.academy.scrapper.exceptionHandler; -import backend.academy.scrapper.exceptions.UserAlreadyExistsException; +import backend.academy.scrapper.exception.UserAlreadyExistsException; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.ExceptionHandler; diff --git a/scrapper/src/main/java/backend/academy/scrapper/model/entities/Filter.java b/scrapper/src/main/java/backend/academy/scrapper/model/entities/Filter.java index 6788e5c..28bf764 100644 --- a/scrapper/src/main/java/backend/academy/scrapper/model/entities/Filter.java +++ b/scrapper/src/main/java/backend/academy/scrapper/model/entities/Filter.java @@ -27,8 +27,6 @@ public class Filter { @NotNull private Set trackingLinks = new HashSet<>(); - ; - public Filter() {} public Set getTrackingLinks() { diff --git a/scrapper/src/main/java/backend/academy/scrapper/model/entities/Link.java b/scrapper/src/main/java/backend/academy/scrapper/model/entities/Link.java index 6d1c751..c7a38b8 100644 --- a/scrapper/src/main/java/backend/academy/scrapper/model/entities/Link.java +++ b/scrapper/src/main/java/backend/academy/scrapper/model/entities/Link.java @@ -41,8 +41,6 @@ public class Link { inverseJoinColumns = @JoinColumn(name = "tag_id")) private Set tags = new HashSet<>(); - ; - @ManyToMany @NotNull @JoinTable( @@ -51,8 +49,6 @@ public class Link { inverseJoinColumns = @JoinColumn(name = "filter_id")) private Set filters = new HashSet<>(); - ; - @Column(name = "last_updated") @Nullable private Instant lastUpdated; diff --git a/scrapper/src/main/java/backend/academy/scrapper/model/entities/Tag.java b/scrapper/src/main/java/backend/academy/scrapper/model/entities/Tag.java index a22434a..35f3e32 100644 --- a/scrapper/src/main/java/backend/academy/scrapper/model/entities/Tag.java +++ b/scrapper/src/main/java/backend/academy/scrapper/model/entities/Tag.java @@ -27,8 +27,6 @@ public class Tag { @NotNull private Set trackingLinks = new HashSet<>(); - ; - public Tag() {} public Tag(String tag) { diff --git a/scrapper/src/main/java/backend/academy/scrapper/service/linkService/UnifiedLinkService.java b/scrapper/src/main/java/backend/academy/scrapper/service/linkService/UnifiedLinkService.java index ba9aed8..a177057 100644 --- a/scrapper/src/main/java/backend/academy/scrapper/service/linkService/UnifiedLinkService.java +++ b/scrapper/src/main/java/backend/academy/scrapper/service/linkService/UnifiedLinkService.java @@ -4,7 +4,7 @@ import backend.academy.scrapper.data.LinkRepository; import backend.academy.scrapper.data.TagRepository; import backend.academy.scrapper.data.UserRepository; -import backend.academy.scrapper.exceptions.ResourceNotFoundException; +import backend.academy.scrapper.exception.ResourceNotFoundException; import backend.academy.scrapper.model.entities.Filter; import backend.academy.scrapper.model.entities.Link; import backend.academy.scrapper.model.entities.Tag; diff --git a/scrapper/src/main/java/backend/academy/scrapper/service/notification/FallbackNotificationService.java b/scrapper/src/main/java/backend/academy/scrapper/service/notification/FallbackNotificationService.java new file mode 100644 index 0000000..d42b890 --- /dev/null +++ b/scrapper/src/main/java/backend/academy/scrapper/service/notification/FallbackNotificationService.java @@ -0,0 +1,47 @@ +package backend.academy.scrapper.service.notification; + +import io.github.resilience4j.circuitbreaker.CircuitBreaker; +import io.github.resilience4j.reactor.circuitbreaker.operator.CircuitBreakerOperator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Primary; +import org.springframework.stereotype.Service; +import reactor.core.publisher.Mono; + +@Service +@Primary +public class FallbackNotificationService implements NotificationService { + + private final NotificationService primary; + private final NotificationService secondary; + private final CircuitBreaker notificationCb; + private static final Logger log = LoggerFactory.getLogger(FallbackNotificationService.class); + + public FallbackNotificationService( + @Qualifier(HttpNotificationService.BEAN_NAME) NotificationService http, + @Qualifier(KafkaProducerNotificationService.BEAN_NAME) NotificationService kafka, + @Value("${app.message-transport:http}") String primaryTransport, + @Qualifier("notificationCb") CircuitBreaker notificationCb) { + this.notificationCb = notificationCb; + if ("kafka".equalsIgnoreCase(primaryTransport)) { + this.primary = kafka; + this.secondary = http; + } else { + this.primary = http; + this.secondary = kafka; + } + } + + @Override + public Mono sendNotification(String message, long userId) { + return primary.sendNotification(message, userId) + .transformDeferred(CircuitBreakerOperator.of(notificationCb)) + .onErrorResume(ex -> { + log.warn("Первичный транспорт упал - fallback. Причина: {}", ex.getMessage()); + return secondary.sendNotification(message, userId); + }); + } +} + diff --git a/scrapper/src/main/java/backend/academy/scrapper/service/notification/HttpNotificationService.java b/scrapper/src/main/java/backend/academy/scrapper/service/notification/HttpNotificationService.java index 5b19685..7306397 100644 --- a/scrapper/src/main/java/backend/academy/scrapper/service/notification/HttpNotificationService.java +++ b/scrapper/src/main/java/backend/academy/scrapper/service/notification/HttpNotificationService.java @@ -3,11 +3,15 @@ import backend.academy.scrapper.model.dto.NotificationRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Service; import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Mono; -@Service +@Service(HttpNotificationService.BEAN_NAME) public class HttpNotificationService implements NotificationService { + public static final String BEAN_NAME = "httpNotificationService"; + private static final Logger log = LoggerFactory.getLogger(HttpNotificationService.class); private final WebClient webClient; @@ -16,17 +20,15 @@ public HttpNotificationService(WebClient webClient) { } @Override - public void sendNotification(String message, long userId) { - log.info("Отправлено уведомление {}", message); + public Mono sendNotification(String message, long userId) { String botNotificationUrl = "http://localhost:8080/api/bot/notify"; - webClient - .post() - .uri(botNotificationUrl) - .bodyValue(new NotificationRequest(message, userId)) - .retrieve() - .bodyToMono(Void.class) - .subscribe( - unused -> log.info("Уведомление успешно доставлено"), - error -> log.error("Ошибка при отправке уведомления", error)); + String url = botNotificationUrl + "/api/bot/notify"; + return webClient.post() + .uri(url) + .bodyValue(new NotificationRequest(message, userId)) + .retrieve() + .bodyToMono(Void.class) + .doOnSuccess(v -> log.info("HTTP-уведомление доставлено")) + .doOnError(error -> log.warn("HTTP-ошибка", error)); } } diff --git a/scrapper/src/main/java/backend/academy/scrapper/service/notification/KafkaProducerNotificationService.java b/scrapper/src/main/java/backend/academy/scrapper/service/notification/KafkaProducerNotificationService.java new file mode 100644 index 0000000..b7ddec6 --- /dev/null +++ b/scrapper/src/main/java/backend/academy/scrapper/service/notification/KafkaProducerNotificationService.java @@ -0,0 +1,39 @@ +package backend.academy.scrapper.service.notification; + +import backend.academy.avro.NotificationRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Service; +import reactor.core.publisher.Mono; + +@Service(KafkaProducerNotificationService.BEAN_NAME) + +public class KafkaProducerNotificationService implements NotificationService { + public static final String BEAN_NAME = "kafkaNotificationService"; + private static final Logger log = LoggerFactory.getLogger(KafkaProducerNotificationService.class); + private final KafkaTemplate kafkaTemplate; + + @Value("${app.topics.output}") + private String topic; + + public KafkaProducerNotificationService(KafkaTemplate kafkaTemplate) { + this.kafkaTemplate = kafkaTemplate; + } + + @Override + public Mono sendNotification(String message, long userId) { + NotificationRequest record = NotificationRequest.newBuilder() + .setMessage(message) + .setUserId(userId) + .build(); + + return Mono.fromFuture(kafkaTemplate.send(topic, record)) + .doOnSuccess(result -> log.info("Kafka-сообщение отправлено offset={}", + result.getRecordMetadata().offset())) + .doOnError(error -> log.error("Ошибка при отправке Avro-сообщения", error)) + .then(); + } + +} diff --git a/scrapper/src/main/java/backend/academy/scrapper/service/notification/NotificationService.java b/scrapper/src/main/java/backend/academy/scrapper/service/notification/NotificationService.java index 8ebec4c..f78d4de 100644 --- a/scrapper/src/main/java/backend/academy/scrapper/service/notification/NotificationService.java +++ b/scrapper/src/main/java/backend/academy/scrapper/service/notification/NotificationService.java @@ -1,8 +1,9 @@ package backend.academy.scrapper.service.notification; import org.springframework.stereotype.Service; +import reactor.core.publisher.Mono; @Service public interface NotificationService { - void sendNotification(String message, long userId); + Mono sendNotification(String message, long userId); } diff --git a/scrapper/src/main/java/backend/academy/scrapper/service/userService/UnifiedUserService.java b/scrapper/src/main/java/backend/academy/scrapper/service/userService/UnifiedUserService.java index 4b581b6..8051b92 100644 --- a/scrapper/src/main/java/backend/academy/scrapper/service/userService/UnifiedUserService.java +++ b/scrapper/src/main/java/backend/academy/scrapper/service/userService/UnifiedUserService.java @@ -1,7 +1,7 @@ package backend.academy.scrapper.service.userService; import backend.academy.scrapper.data.UserRepository; -import backend.academy.scrapper.exceptions.UserAlreadyExistsException; +import backend.academy.scrapper.exception.UserAlreadyExistsException; import backend.academy.scrapper.model.entities.User; import java.util.Optional; import java.util.concurrent.CopyOnWriteArrayList; diff --git a/scrapper/src/main/java/backend/academy/scrapper/service/userService/UserService.java b/scrapper/src/main/java/backend/academy/scrapper/service/userService/UserService.java index 73678cc..3c58be9 100644 --- a/scrapper/src/main/java/backend/academy/scrapper/service/userService/UserService.java +++ b/scrapper/src/main/java/backend/academy/scrapper/service/userService/UserService.java @@ -1,6 +1,6 @@ package backend.academy.scrapper.service.userService; -import backend.academy.scrapper.exceptions.UserAlreadyExistsException; +import backend.academy.scrapper.exception.UserAlreadyExistsException; import backend.academy.scrapper.model.entities.User; import org.springframework.stereotype.Service; diff --git a/scrapper/src/main/resources/application.properties b/scrapper/src/main/resources/application.properties index 7651d8e..377a9e8 100644 --- a/scrapper/src/main/resources/application.properties +++ b/scrapper/src/main/resources/application.properties @@ -1,3 +1,5 @@ logging.structured.format.console=ecs -access-type=ORM +access-type=SQL #ORM/SQL +app.message-transport=Kafka +#Kafka/HTTP diff --git a/scrapper/src/main/resources/application.yaml b/scrapper/src/main/resources/application.yaml index 32966fe..5edb6af 100644 --- a/scrapper/src/main/resources/application.yaml +++ b/scrapper/src/main/resources/application.yaml @@ -1,10 +1,78 @@ app: + topics: + output: scrapperTopic github-token: ${GITHUB_TOKEN} # env variable stackoverflow: key: ${SO_TOKEN_KEY} access-token: ${SO_ACCESS_TOKEN} + rate-limit: + defaults: + capacity: 50 + refill-tokens: 50 + period: 1m + error-code: 429 + error-message: "Too Many Requests" + per-endpoint: + "api/scrapper/links": + capacity: 20 + refill-tokens: 20 + period: 1m + error-code: 429 + error-message: "Too Many Requests on api/scrapper/links" + "api/scrapper/user": + capacity: 20 + refill-tokens: 20 + period: 1m + error-code: 429 + error-message: "Too Many Requests on api/scrapper/user" + "api/scrapper/track": + capacity: 100 + refill-tokens: 100 + period: 1m + error-code: 429 + error-message: "Too Many Requests on api/scrapper/track" + "api/scrapper/untrack": + capacity: 100 + refill-tokens: 100 + period: 1m + error-code: 429 + error-message: "Too Many Requests on api/scrapper/untrack" + + http: + connect-timeout: 5s + read-timeout: 10s + write-timeout: 10s + response-timeout: 12s + + retry: + attempts: 3 + backoff: 2s + status-codes: + - 502 + - 503 + - 504 + + circuit-breaker: + sliding-window-size: 1 + minimum-required-calls: 1 + failure-rate-threshold: 100 + permitted-calls-in-half-open-state: 1 + wait-duration-in-open-state: 1s + spring: + devtools: + restart: + enabled: false + kafka: + bootstrap-servers: localhost:9092 + properties: + schema.registry.url: http://localhost:8082 + producer: + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer + retries: 3 + acks: all application: name: Scrapper liquibase: diff --git a/scrapper/src/test/java/backend/academy/scrapper/CircuitBreakerFailFastTest.java b/scrapper/src/test/java/backend/academy/scrapper/CircuitBreakerFailFastTest.java new file mode 100644 index 0000000..a6b1a1e --- /dev/null +++ b/scrapper/src/test/java/backend/academy/scrapper/CircuitBreakerFailFastTest.java @@ -0,0 +1,105 @@ +package backend.academy.scrapper; + +import com.github.tomakehurst.wiremock.junit5.WireMockExtension; +import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo; +import io.github.resilience4j.circuitbreaker.*; +import io.github.resilience4j.reactor.circuitbreaker.operator.CircuitBreakerOperator; +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.extension.ExtendWith; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.TestPropertySource; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Mono; +import reactor.netty.http.client.HttpClient; +import reactor.netty.transport.TransportConfig; +import reactor.test.StepVerifier; + +import static com.github.tomakehurst.wiremock.client.WireMock.*; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Проверяем, что после первой неудачи Circuit-Breaker открывается + * и второй вызов завершается CallNotPermittedException быстрее сетевых таймаутов. + */ +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE) +@TestPropertySource(properties = { // фиктивные токены → контекст собирается без real-секретов + "GITHUB_TOKEN=dummy", + "SO_TOKEN_KEY=dummy", + "SO_ACCESS_TOKEN=dummy" +}) +@ExtendWith(WireMockExtension.class) +class CircuitBreakerFailFastTest { + + /* ---------- тестовая инфраструктура ------------------------------------------------------ */ + + private String baseUrl; + private WebClient webClient; // WebClient c нужными таймаутами + private final CircuitBreaker cb = // CB, который открывается после ПЕРВОЙ ошибки + CircuitBreaker.of("slowApi", CircuitBreakerConfig.custom() + .slidingWindowType(CircuitBreakerConfig.SlidingWindowType.COUNT_BASED) + .slidingWindowSize(1) + .minimumNumberOfCalls(1) + .failureRateThreshold(1) // 1 неудача ⇒ 100 % ошибок ⇒ OPEN + .waitDurationInOpenState(Duration.ofSeconds(30)) + .permittedNumberOfCallsInHalfOpenState(1) + .recordException(e -> true) // любая ошибка — причина фейла + .build()); + + @BeforeEach + void setUpStubAndClient(WireMockRuntimeInfo wm) { + if (baseUrl == null) { // регистрируем WireMock только один раз + baseUrl = wm.getHttpBaseUrl(); // http://localhost:{port} + wm.getWireMock().register( + get(urlPathEqualTo("/slow")) + .willReturn(aResponse() + .withFixedDelay(15_000) // дольше responseTimeout-а (10 с) + .withStatus(504)) + ); + } + + /* WebClient со «строгими» таймаутами: 5 с connect-timeout + 10 с response-timeout */ + HttpClient httpClient = HttpClient.create() + .option(TransportConfig.CONNECT_TIMEOUT_MILLIS, 5_000) + .responseTimeout(Duration.ofSeconds(10)); + + webClient = WebClient.builder() + .clientConnector(new reactor.netty.http.client.HttpClientConnector(httpClient)) + .build(); + } + + /* ---------- полезный метод --------------------------------------------------------------- */ + + private Mono callSlow() { + return webClient.get() + .uri(baseUrl + "/slow") + .retrieve() + .bodyToMono(Void.class) + .transformDeferred(CircuitBreakerOperator.of(cb)); // вешаем CB поверх вызова + } + + /* ---------- сам тест --------------------------------------------------------------------- */ + + @Test + void circuitOpensAfterFirstTimeout_andFailsFastOnSecondCall() { + + /* 1) первый запрос → зависает дольше responseTimeout-а и падает ReadTimeoutException */ + StepVerifier.create(callSlow()) + .expectError() // любая ошибка с таймаута + .verify(Duration.ofSeconds(12)); // 10 с + небольшой запас + + /* 2) CB уже OPEN → CallNotPermittedException приходит «мгновенно» */ + long start = System.nanoTime(); + + StepVerifier.create(callSlow()) + .expectError(CallNotPermittedException.class) + .verify(Duration.ofMillis(300)); // надёжный запас + + long tookMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + assertThat(tookMs) + .as("Ошибка должна прийти быстрее connect-timeout’а (5 000 мс)") + .isLessThan(500); + } +} diff --git a/scrapper/src/test/java/backend/academy/scrapper/FallbackNotificationServiceTest.java b/scrapper/src/test/java/backend/academy/scrapper/FallbackNotificationServiceTest.java new file mode 100644 index 0000000..18e451a --- /dev/null +++ b/scrapper/src/test/java/backend/academy/scrapper/FallbackNotificationServiceTest.java @@ -0,0 +1,29 @@ +package backend.academy.scrapper; + +import backend.academy.scrapper.service.notification.FallbackNotificationService; +import backend.academy.scrapper.service.notification.NotificationService; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; +import static org.mockito.Mockito.when; + +class FallbackNotificationServiceTest { + + @Test + void shouldFallbackToKafkaWhenHttpFails() { + NotificationService http = Mockito.mock(NotificationService.class); + NotificationService kafka = Mockito.mock(NotificationService.class); + + when(http.sendNotification(Mockito.anyString(), Mockito.anyLong())) + .thenReturn(Mono.error(new RuntimeException("timeout"))); + when(kafka.sendNotification(Mockito.anyString(), Mockito.anyLong())) + .thenReturn(Mono.empty()); + + FallbackNotificationService fallback = + new FallbackNotificationService(http, kafka, "http"); + + StepVerifier.create(fallback.sendNotification("hi", 1L)) + .verifyComplete(); + } +} diff --git a/scrapper/src/test/java/backend/academy/scrapper/JdbcUserServiceTest.java b/scrapper/src/test/java/backend/academy/scrapper/JdbcUserServiceTest.java index 8f55cdf..4412a7a 100644 --- a/scrapper/src/test/java/backend/academy/scrapper/JdbcUserServiceTest.java +++ b/scrapper/src/test/java/backend/academy/scrapper/JdbcUserServiceTest.java @@ -4,7 +4,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import backend.academy.scrapper.data.jdbcRepositories.JdbcUserRepository; -import backend.academy.scrapper.exceptions.UserAlreadyExistsException; +import backend.academy.scrapper.exception.UserAlreadyExistsException; import backend.academy.scrapper.model.entities.User; import backend.academy.scrapper.service.userService.UserService; import org.junit.jupiter.api.Test; diff --git a/scrapper/src/test/java/backend/academy/scrapper/OrmLinkServiceTest.java b/scrapper/src/test/java/backend/academy/scrapper/OrmLinkServiceTest.java index 7f6cd7c..1ef51bc 100644 --- a/scrapper/src/test/java/backend/academy/scrapper/OrmLinkServiceTest.java +++ b/scrapper/src/test/java/backend/academy/scrapper/OrmLinkServiceTest.java @@ -8,7 +8,7 @@ import backend.academy.scrapper.data.ormRepositories.OrmLinkRepository; import backend.academy.scrapper.data.ormRepositories.OrmTagRepository; import backend.academy.scrapper.data.ormRepositories.OrmUserRepository; -import backend.academy.scrapper.exceptions.ResourceNotFoundException; +import backend.academy.scrapper.exception.ResourceNotFoundException; import backend.academy.scrapper.model.entities.Filter; import backend.academy.scrapper.model.entities.Link; import backend.academy.scrapper.model.entities.Tag; diff --git a/scrapper/src/test/java/backend/academy/scrapper/OrmUserServiceTest.java b/scrapper/src/test/java/backend/academy/scrapper/OrmUserServiceTest.java index 603dbd3..7fb8db2 100644 --- a/scrapper/src/test/java/backend/academy/scrapper/OrmUserServiceTest.java +++ b/scrapper/src/test/java/backend/academy/scrapper/OrmUserServiceTest.java @@ -4,7 +4,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import backend.academy.scrapper.data.ormRepositories.OrmUserRepository; -import backend.academy.scrapper.exceptions.UserAlreadyExistsException; +import backend.academy.scrapper.exception.UserAlreadyExistsException; import backend.academy.scrapper.model.entities.User; import backend.academy.scrapper.service.userService.UserService; import org.junit.jupiter.api.AfterEach; diff --git a/spotbugs-excludes.xml b/spotbugs-excludes.xml index f7db4e8..763b365 100644 --- a/spotbugs-excludes.xml +++ b/spotbugs-excludes.xml @@ -7,6 +7,12 @@ + + + + + +