Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ public class COBConstant {

public static final String COB_CUSTOM_JOB_PARAMETER_KEY = "CUSTOM_JOB_PARAMETER_ID";

public static final String COB_PARAMETER = "loanCobParameter";
public static final Long NUMBER_OF_DAYS_BEHIND = 1L;
public static final String PARTITION_KEY = "partition";
public static final String PARTITION_PREFIX = "partition_";

protected COBConstant() {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.fineract.cob.common;

import java.time.LocalDate;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.fineract.cob.COBConstant;
import org.apache.fineract.cob.data.BusinessStepNameAndOrder;
import org.apache.fineract.cob.data.COBParameter;
import org.apache.fineract.cob.data.COBPartition;
import org.apache.fineract.cob.resolver.BusinessDateResolver;
import org.apache.fineract.cob.resolver.CatchUpFlagResolver;
import org.apache.fineract.cob.service.RetrieveIdService;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.launch.JobExecutionNotRunningException;
import org.springframework.batch.core.launch.JobOperator;
import org.springframework.batch.core.launch.NoSuchJobExecutionException;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.util.StopWatch;

@Slf4j
@RequiredArgsConstructor
public abstract class CommonPartitioner {

private final JobOperator jobOperator;
private final StepExecution stepExecution;
private final Long numberOfDays;
private final RetrieveIdService retrieveIdService;

public Map<String, ExecutionContext> getPartitions(int partitionSize, Set<BusinessStepNameAndOrder> cobBusinessSteps) {
if (cobBusinessSteps.isEmpty()) {
stopJobExecution();
return Map.of();
}
LocalDate businessDate = BusinessDateResolver.resolve(stepExecution);
boolean isCatchUp = CatchUpFlagResolver.resolve(stepExecution);
StopWatch sw = new StopWatch();
sw.start();
List<COBPartition> partitions = new ArrayList<>(
retrieveIdService.retrieveLoanCOBPartitions(numberOfDays, businessDate, isCatchUp, partitionSize));
sw.stop();
// if there is no loan to be closed, we still would like to create at least one partition

if (partitions.isEmpty()) {
partitions.add(new COBPartition(0L, 0L, 1L, 0L));
}
log.info(
"{}} found {} loans to be processed as part of COB. {} partitions were created using partition size {}. RetrieveLoanCOBPartitions was executed in {} ms.",
getClass().getName(), getLoanCount(partitions), partitions.size(), partitionSize, sw.getTotalTimeMillis());
return partitions.stream().collect(Collectors.toMap(l -> COBConstant.PARTITION_PREFIX + l.getPageNo(),
l -> createExecutionContextForPartition(cobBusinessSteps, l, businessDate, isCatchUp)));
}

private long getLoanCount(List<COBPartition> loanCOBPartitions) {
return loanCOBPartitions.stream().map(COBPartition::getCount).reduce(0L, Long::sum);
}

private ExecutionContext createExecutionContextForPartition(Set<BusinessStepNameAndOrder> cobBusinessSteps,
COBPartition loanCOBPartition, LocalDate businessDate, boolean isCatchUp) {
ExecutionContext executionContext = new ExecutionContext();
executionContext.put(COBConstant.BUSINESS_STEPS, cobBusinessSteps);
executionContext.put(COBConstant.COB_PARAMETER, new COBParameter(loanCOBPartition.getMinId(), loanCOBPartition.getMaxId()));
executionContext.put(COBConstant.PARTITION_KEY, COBConstant.PARTITION_PREFIX + loanCOBPartition.getPageNo());
executionContext.put(COBConstant.BUSINESS_DATE_PARAMETER_NAME, businessDate.toString());
executionContext.put(COBConstant.IS_CATCH_UP_PARAMETER_NAME, Boolean.toString(isCatchUp));
return executionContext;
}

private void stopJobExecution() {
Long jobId = stepExecution.getJobExecution().getId();
try {
jobOperator.stop(jobId);
} catch (NoSuchJobExecutionException | JobExecutionNotRunningException e) {
log.error("There is no running execution for the given execution ID. Execution ID: {}", jobId);
throw new RuntimeException(e);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public static COBParameter convert(Object obj) {
if (obj instanceof COBParameter) {
return (COBParameter) obj;
} else if (obj instanceof LoanCOBParameter loanCOBParameter) {
// for backward compatibility
return loanCOBParameter.toCOBParameter();
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
package org.apache.fineract.cob.data;

import com.fasterxml.jackson.annotation.JsonTypeInfo;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
Expand All @@ -31,4 +35,10 @@ public class BusinessStepNameAndOrder {

private String stepName;
private Long stepOrder;

public static TreeMap<Long, String> getBusinessStepMap(Set<BusinessStepNameAndOrder> businessSteps) {
Map<Long, String> businessStepMap = businessSteps.stream()
.collect(Collectors.toMap(BusinessStepNameAndOrder::getStepOrder, BusinessStepNameAndOrder::getStepName));
return new TreeMap<>(businessStepMap);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
@NoArgsConstructor
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS)
@EqualsAndHashCode
@Deprecated
public class LoanCOBParameter {

private Long minLoanId;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.fineract.cob.domain;

import java.sql.PreparedStatement;
import java.time.LocalDate;
import java.util.List;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.fineract.infrastructure.businessdate.domain.BusinessDateType;
import org.apache.fineract.infrastructure.core.config.FineractProperties;
import org.apache.fineract.infrastructure.core.service.DateUtils;
import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil;
import org.springframework.jdbc.core.JdbcTemplate;

@RequiredArgsConstructor
@Slf4j
public abstract class AbstractLockingService<T extends AccountLock> implements LockingService<T> {

private final JdbcTemplate jdbcTemplate;
private final FineractProperties fineractProperties;
private final AccountLockRepository<T> loanAccountLockRepository;

protected abstract String getBatchLoanLockUpgrade();

protected abstract String getBatchLoanLockInsert();

@Override
public void upgradeLock(List<Long> accountsToLock, LockOwner lockOwner) {
jdbcTemplate.batchUpdate(getBatchLoanLockUpgrade(), accountsToLock, getInClauseParameterSizeLimit(), (ps, id) -> {
ps.setString(1, lockOwner.name());
ps.setObject(2, DateUtils.getAuditOffsetDateTime());
ps.setLong(3, id);
});
}

@Override
public List<T> findAllByLoanIdIn(List<Long> loanIds) {
return loanAccountLockRepository.findAllByLoanIdIn(loanIds);
}

@Override
public T findByLoanIdAndLockOwner(Long loanId, LockOwner lockOwner) {
return loanAccountLockRepository.findByLoanIdAndLockOwner(loanId, lockOwner).orElseGet(() -> {
log.warn("There is no lock for loan account with id: {}", loanId);
return null;
});
}

@Override
public List<T> findAllByLoanIdInAndLockOwner(List<Long> loanIds, LockOwner lockOwner) {
return loanAccountLockRepository.findAllByLoanIdInAndLockOwner(loanIds, lockOwner);
}

@Override
public void applyLock(List<Long> loanIds, LockOwner lockOwner) {
LocalDate cobBusinessDate = ThreadLocalContextUtil.getBusinessDateByType(BusinessDateType.COB_DATE);
jdbcTemplate.batchUpdate(getBatchLoanLockInsert(), loanIds, loanIds.size(), (PreparedStatement ps, Long loanId) -> {
ps.setLong(1, loanId);
ps.setLong(2, 1);
ps.setString(3, lockOwner.name());
ps.setObject(4, DateUtils.getAuditOffsetDateTime());
ps.setObject(5, cobBusinessDate);
});
}

@Override
public void deleteByLoanIdInAndLockOwner(List<Long> loanIds, LockOwner lockOwner) {
loanAccountLockRepository.deleteByLoanIdInAndLockOwner(loanIds, lockOwner);
}

private int getInClauseParameterSizeLimit() {
return fineractProperties.getQuery().getInClauseParameterSizeLimit();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.fineract.cob.domain;

import jakarta.persistence.Column;
import jakarta.persistence.EnumType;
import jakarta.persistence.Enumerated;
import jakarta.persistence.Id;
import jakarta.persistence.MappedSuperclass;
import jakarta.persistence.PostLoad;
import jakarta.persistence.PrePersist;
import jakarta.persistence.Transient;
import jakarta.persistence.Version;
import java.io.Serializable;
import java.time.LocalDate;
import java.time.OffsetDateTime;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.fineract.infrastructure.core.service.DateUtils;
import org.springframework.data.domain.Persistable;

@Getter
@MappedSuperclass
@NoArgsConstructor
public abstract class AccountLock implements Persistable<Long>, Serializable {

protected static final long serialVersionUID = 2272591907035824317L;

@Id
@Getter
@Column(name = "loan_id", nullable = false)
protected Long loanId;

@Version
@Getter
@Column(name = "version")
protected Long version;

@Enumerated(EnumType.STRING)
@Getter
@Column(name = "lock_owner", nullable = false)
protected LockOwner lockOwner;

@Column(name = "lock_placed_on", nullable = false)
@Getter
protected OffsetDateTime lockPlacedOn;

@Column(name = "error")
@Getter
protected String error;

@Column(name = "stacktrace")
@Getter
protected String stacktrace;

@Column(name = "lock_placed_on_cob_business_date")
@Getter
protected LocalDate lockPlacedOnCobBusinessDate;

@Transient
@Setter(value = AccessLevel.NONE)
@Getter
protected boolean isNew = true;

@PrePersist
@PostLoad
void markNotNew() {
this.isNew = false;
}

@Override
public Long getId() {
return getLoanId();
}

public AccountLock(Long loanId, LockOwner lockOwner, LocalDate lockPlacedOnCobBusinessDate) {
this.loanId = loanId;
this.lockOwner = lockOwner;
this.lockPlacedOn = DateUtils.getAuditOffsetDateTime();
this.lockPlacedOnCobBusinessDate = lockPlacedOnCobBusinessDate;
}

public void setError(String errorMessage, String stacktrace) {
this.error = errorMessage;
this.stacktrace = stacktrace;
}

public void setNewLockOwner(LockOwner newLockOwner) {
this.lockOwner = newLockOwner;
this.lockPlacedOn = DateUtils.getAuditOffsetDateTime();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.fineract.cob.domain;

import java.util.List;
import java.util.Optional;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.repository.NoRepositoryBean;

@NoRepositoryBean
public interface AccountLockRepository<T extends AccountLock> {

Optional<T> findByLoanIdAndLockOwner(Long loanId, LockOwner lockOwner);

void deleteByLoanIdInAndLockOwner(List<Long> loanIds, LockOwner lockOwner);

List<T> findAllByLoanIdIn(List<Long> loanIds);

boolean existsByLoanIdAndLockOwner(Long loanId, LockOwner lockOwner);

boolean existsByLoanIdAndLockOwnerAndErrorIsNotNull(Long loanId, LockOwner lockOwner);

List<T> findAllByLoanIdInAndLockOwner(List<Long> loanIds, LockOwner lockOwner);

void removeByLockOwnerInAndErrorIsNotNullAndLockPlacedOnCobBusinessDateIsNotNull(List<LockOwner> lockOwners);

Page<T> findAll(Pageable loanAccountLockPage);
}
Loading
Loading