Skip to content

Commit

Permalink
Payments provider flow (#28)
Browse files Browse the repository at this point in the history
* add ProviderPaymentsTask

* fix props

* add DisputesService.finishSuccess

* add ProviderPaymentsAdminManagementHandler

* refactor

* bump proto

* add testFullFlowCreateAdjustmentWhenFailedPaymentSuccess

* add testFullFlowCreateAdjustmentWhenFailedPaymentSuccess

* decreasy woody (#27)

(cherry picked from commit 69d9aee)

* fix returning ProviderCallback id

* fix returning ProviderCallback id

* fix returning ProviderCallback id

* review fixes

* review fixes
  • Loading branch information
karle0wne authored Nov 7, 2024
1 parent ceca663 commit f15ba61
Show file tree
Hide file tree
Showing 43 changed files with 900 additions and 194 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>dev.vality</groupId>
<artifactId>service-parent-pom</artifactId>
<version>3.0.4</version>
<version>3.0.6</version>
</parent>

<artifactId>disputes-api</artifactId>
Expand Down Expand Up @@ -47,7 +47,7 @@
<dependency>
<groupId>dev.vality</groupId>
<artifactId>disputes-proto</artifactId>
<version>1.34-dd41929</version>
<version>1.38-8d4b4a5</version>
</dependency>
<dependency>
<groupId>dev.vality</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
import org.apache.hc.core5.http.HttpEntity;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

import java.io.IOException;
Expand All @@ -39,7 +37,7 @@ public class AdminManagementDisputesService {
private final FileStorageService fileStorageService;
private final CloseableHttpClient httpClient;

@Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.REPEATABLE_READ)
@Transactional
public void cancelPendingDispute(CancelParams cancelParams) {
var disputeId = cancelParams.getDisputeId();
log.debug("Trying to getForUpdateSkipLocked {}", disputeId);
Expand All @@ -60,7 +58,7 @@ public void cancelPendingDispute(CancelParams cancelParams) {
}
}

@Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.REPEATABLE_READ)
@Transactional
public void approvePendingDispute(ApproveParams approveParam) {
var disputeId = approveParam.getDisputeId();
log.debug("Trying to getForUpdateSkipLocked {}", disputeId);
Expand Down Expand Up @@ -88,7 +86,7 @@ public void approvePendingDispute(ApproveParams approveParam) {
log.info("Request was skipped by inappropriate status {}", dispute);
}

@Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.REPEATABLE_READ)
@Transactional
public void bindCreatedDispute(BindParams bindParam) {
var disputeId = bindParam.getDisputeId();
log.debug("Trying to getForUpdateSkipLocked {}", disputeId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

import java.util.UUID;
Expand All @@ -22,7 +21,7 @@ public class ApiAttachmentsService {
private final FileMetaDao fileMetaDao;
private final FileStorageService fileStorageService;

@Transactional(propagation = Propagation.REQUIRED)
@Transactional
public void createAttachments(CreateRequest req, UUID disputeId) {
log.debug("Trying to save Attachments {}", disputeId);
for (var attachment : req.getAttachments()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

import java.util.Optional;
Expand Down Expand Up @@ -40,7 +39,7 @@ public Optional<Dispute> checkExistBeforeCreate(String invoiceId, String payment
return first;
}

@Transactional(propagation = Propagation.REQUIRED)
@Transactional
public UUID createDispute(CreateRequest req, PaymentParams paymentParams) {
log.debug("Start creating Dispute {}", paymentParams);
var dispute = disputeConverter.convert(paymentParams, req.getAmount(), req.getReason());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,13 @@ public ExecutorService disputesThreadPool(@Value("${dispute.batchSize}") int thr
.build();
return Executors.newFixedThreadPool(threadPoolSize, threadFactory);
}

@Bean
public ExecutorService providerPaymentsThreadPool(@Value("${provider.payments.batchSize}") int threadPoolSize) {
final var threadFactory = new ThreadFactoryBuilder()
.setNameFormat("provider-payments-exec-%d")
.setDaemon(true)
.build();
return Executors.newFixedThreadPool(threadPoolSize, threadFactory);
}
}
8 changes: 4 additions & 4 deletions src/main/java/dev/vality/disputes/config/CacheConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,18 @@ public class CacheConfig {

@Bean
@Primary
public CacheManager providersDisputesCacheManager() {
public CacheManager providerDisputesCacheManager() {
var caffeineCacheManager = new CaffeineCacheManager();
caffeineCacheManager.setCaffeine(adaptersConnectionsCacheConfig());
caffeineCacheManager.setCacheNames(List.of("providersDisputes"));
caffeineCacheManager.setCacheNames(List.of("providerDisputes"));
return caffeineCacheManager;
}

@Bean
public CacheManager providersPaymentsCacheManager() {
public CacheManager providerPaymentsCacheManager() {
var caffeineCacheManager = new CaffeineCacheManager();
caffeineCacheManager.setCaffeine(adaptersConnectionsCacheConfig());
caffeineCacheManager.setCacheNames(List.of("providersPayments"));
caffeineCacheManager.setCacheNames(List.of("providerPayments"));
return caffeineCacheManager;
}

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/dev/vality/disputes/config/NetworkConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class NetworkConfig {
public static final String MERCHANT = "/v1/merchant";
public static final String ADMIN_MANAGEMENT = "/v1/admin-management";
public static final String CALLBACK = "/v1/callback";
public static final String PAYMENTS_ADMIN_MANAGEMENT = "/v1/payments-admin-management";
public static final String PROVIDER_PAYMENTS_ADMIN_MANAGEMENT = "/v1/provider-payments-admin-management";

@Bean
public FilterRegistrationBean externalPortRestrictingFilter() {
Expand All @@ -42,7 +42,7 @@ protected void doFilterInternal(HttpServletRequest request,
|| servletPath.startsWith(MERCHANT)
|| servletPath.startsWith(ADMIN_MANAGEMENT)
|| servletPath.startsWith(CALLBACK)
|| servletPath.startsWith(PAYMENTS_ADMIN_MANAGEMENT);
|| servletPath.startsWith(PROVIDER_PAYMENTS_ADMIN_MANAGEMENT);
if ((request.getLocalPort() == restPort) && !enabledPaths) {
response.sendError(404, "Unknown address");
return;
Expand Down
10 changes: 0 additions & 10 deletions src/main/java/dev/vality/disputes/dao/DisputeDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,6 @@ public List<Dispute> getDisputesForUpdateSkipLocked(int limit, DisputeStatus dis
.orElse(List.of());
}

public List<Dispute> getDisputesForUpdateSkipLocked(String invoiceId, String paymentId) {
var query = getDslContext().selectFrom(DISPUTE)
.where(DISPUTE.INVOICE_ID.eq(invoiceId)
.and(DISPUTE.PAYMENT_ID.eq(paymentId)))
.forUpdate()
.skipLocked();
return Optional.ofNullable(fetch(query, disputeRowMapper))
.orElse(List.of());
}

public List<Dispute> getDisputesForHgCall(int limit) {
var query = getDslContext().selectFrom(DISPUTE)
.where(DISPUTE.STATUS.eq(DisputeStatus.create_adjustment)
Expand Down
61 changes: 0 additions & 61 deletions src/main/java/dev/vality/disputes/dao/ProviderCallbackDao.java

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package dev.vality.disputes.provider.payments.admin;

import dev.vality.disputes.domain.enums.ProviderPaymentsStatus;
import dev.vality.disputes.domain.tables.pojos.ProviderCallback;
import dev.vality.disputes.provider.payments.dao.ProviderCallbackDao;
import dev.vality.provider.payments.ApproveParamsRequest;
import dev.vality.provider.payments.CancelParamsRequest;
import dev.vality.provider.payments.ProviderPaymentsAdminManagementServiceSrv;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.thrift.TException;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.stream.Collectors;

@Service
@RequiredArgsConstructor
@Slf4j
@SuppressWarnings({"ParameterName", "LineLength"})
public class ProviderPaymentsAdminManagementHandler implements ProviderPaymentsAdminManagementServiceSrv.Iface {

private final ProviderCallbackDao providerCallbackDao;

@Override
@Transactional
public void cancel(CancelParamsRequest cancelParamsRequest) throws TException {
if (cancelParamsRequest.isCancelAll()) {
var batch = providerCallbackDao.getAllPendingProviderCallbacksForUpdateSkipLocked().stream()
.peek(providerCallback -> setCancelled(cancelParamsRequest, providerCallback))
.toList();
log.info("batch by cancelParamsRequest {}", batch);
providerCallbackDao.updateBatch(batch);
} else if (cancelParamsRequest.getCancelParams().isPresent()) {
var invoicePaymentIds = cancelParamsRequest.getCancelParams().get().stream()
.map(cancelParams -> cancelParams.getInvoiceId() + cancelParams.getPaymentId())
.collect(Collectors.toSet());
var batch = providerCallbackDao.getProviderCallbacksForUpdateSkipLocked(invoicePaymentIds).stream()
.peek(providerCallback -> setCancelled(cancelParamsRequest, providerCallback))
.toList();
log.info("batch by cancelParamsRequest {}", batch);
providerCallbackDao.updateBatch(batch);
}
}

@Override
@Transactional
public void approve(ApproveParamsRequest approveParamsRequest) throws TException {
if (approveParamsRequest.isApproveAll()) {
var batch = providerCallbackDao.getAllPendingProviderCallbacksForUpdateSkipLocked().stream()
.peek(providerCallback -> setReadyToCreateAdjustment(approveParamsRequest, providerCallback))
.toList();
log.info("batch by approveParamsRequest {}", batch);
providerCallbackDao.updateBatch(batch);
} else if (approveParamsRequest.getApproveParams().isPresent()) {
var invoicePaymentIds = approveParamsRequest.getApproveParams().get().stream()
.map(approveParams -> approveParams.getInvoiceId() + approveParams.getPaymentId())
.collect(Collectors.toSet());
var batch = providerCallbackDao.getProviderCallbacksForUpdateSkipLocked(invoicePaymentIds).stream()
.peek(providerCallback -> setReadyToCreateAdjustment(approveParamsRequest, providerCallback))
.toList();
log.info("batch by approveParamsRequest {}", batch);
providerCallbackDao.updateBatch(batch);
}
}

private void setCancelled(CancelParamsRequest cancelParamsRequest, ProviderCallback providerCallback) {
if (cancelParamsRequest.getCancelReason().isPresent()) {
providerCallback.setErrorReason(cancelParamsRequest.getCancelReason().orElse(null));
}
providerCallback.setStatus(ProviderPaymentsStatus.cancelled);
}

private void setReadyToCreateAdjustment(ApproveParamsRequest approveParamsRequest, ProviderCallback providerCallback) {
if (approveParamsRequest.getApproveReason().isPresent()) {
providerCallback.setApproveReason(approveParamsRequest.getApproveReason().orElse(null));
}
providerCallback.setStatus(ProviderPaymentsStatus.create_adjustment);
providerCallback.setSkipCallHgForCreateAdjustment(false);
}
}
Loading

0 comments on commit f15ba61

Please sign in to comment.