SubscriptionApplicationService.java
package com.yumu.noveltranslator.application.service;
import com.stripe.model.Event;
import com.yumu.noveltranslator.domain.model.StripeCustomer;
import com.yumu.noveltranslator.domain.model.StripeSubscription;
import com.yumu.noveltranslator.domain.model.User;
import com.yumu.noveltranslator.domain.model.UserPlanHistory;
import com.yumu.noveltranslator.enums.BillingCycle;
import com.yumu.noveltranslator.enums.SubscriptionPlan;
import com.yumu.noveltranslator.port.dto.subscription.CheckoutSessionRequest;
import com.yumu.noveltranslator.port.dto.subscription.CheckoutSessionResponse;
import com.yumu.noveltranslator.port.dto.subscription.PaymentVerificationResponse;
import com.yumu.noveltranslator.port.dto.subscription.PortalSessionResponse;
import com.yumu.noveltranslator.port.dto.subscription.SubscriptionStatusResponse;
import com.yumu.noveltranslator.port.out.BillingRepositoryPort;
import com.yumu.noveltranslator.port.out.PaymentPort;
import com.yumu.noveltranslator.port.out.TokenRevocationPort;
import com.yumu.noveltranslator.port.out.UserRepositoryPort;
import com.yumu.noveltranslator.port.out.payment.CustomerInfo;
import com.yumu.noveltranslator.port.out.payment.PaymentSessionInfo;
import com.yumu.noveltranslator.port.out.payment.SubscriptionInfo;
import com.yumu.noveltranslator.port.out.payment.SubscriptionUpdateRequest;
import com.yumu.noveltranslator.properties.StripeProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.transaction.support.TransactionTemplate;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Map;
import java.util.Optional;
/**
* 订阅应用服务
*
* <p>事务边界原则:Stripe HTTP 调用在事务外执行(通过 PaymentPort),DB 操作通过 TransactionTemplate 编程式事务执行。
*/
@Slf4j
@Service
public class SubscriptionApplicationService implements com.yumu.noveltranslator.port.in.SubscriptionPort {
private final StripeProperties stripeProperties;
private final BillingRepositoryPort billingPort;
private final UserRepositoryPort userRepositoryPort;
private final StringRedisTemplate stringRedisTemplate;
private final TokenRevocationPort tokenRevocationPort;
private final PaymentPort paymentPort;
private final PlatformTransactionManager transactionManager;
private final TransactionTemplate tx;
public SubscriptionApplicationService(StripeProperties stripeProperties,
BillingRepositoryPort billingPort,
UserRepositoryPort userRepositoryPort,
StringRedisTemplate stringRedisTemplate,
TokenRevocationPort tokenRevocationPort,
PaymentPort paymentPort,
PlatformTransactionManager transactionManager) {
this.stripeProperties = stripeProperties;
this.billingPort = billingPort;
this.userRepositoryPort = userRepositoryPort;
this.stringRedisTemplate = stringRedisTemplate;
this.tokenRevocationPort = tokenRevocationPort;
this.paymentPort = paymentPort;
this.transactionManager = transactionManager;
TransactionDefinition def = new TransactionDefinition() {
@Override public int getPropagationBehavior() { return TransactionDefinition.PROPAGATION_REQUIRED; }
@Override public int getIsolationLevel() { return TransactionDefinition.ISOLATION_DEFAULT; }
@Override public int getTimeout() { return 30; }
@Override public boolean isReadOnly() { return false; }
@Override public String getName() { return "subscription-tx"; }
};
this.tx = new TransactionTemplate(transactionManager, def);
}
// ==================== 用户端 API ====================
/**
* 验证 Checkout Session 支付结果(前端回调时主动查询)
*/
public PaymentVerificationResponse verifyCheckoutSession(String sessionId, Long userId) {
if (sessionId == null || sessionId.isBlank()) {
return new PaymentVerificationResponse(false, sessionId, null, null, "缺少 session_id 参数");
}
try {
PaymentSessionInfo session = paymentPort.retrieveCheckoutSession(sessionId);
String metadataUserId = Optional.ofNullable(session.metadata())
.map(m -> m.get("userId"))
.orElse(null);
if (metadataUserId == null || !metadataUserId.equals(String.valueOf(userId))) {
return new PaymentVerificationResponse(false, sessionId, null, null, "支付会话不属于当前用户");
}
String paymentStatus = session.paymentStatus();
String plan = Optional.ofNullable(session.metadata())
.map(m -> m.get("plan"))
.orElse(null);
if ("paid".equals(paymentStatus)) {
StripeSubscription sub = Optional.ofNullable(session.subscriptionId())
.map(billingPort::findSubscriptionByStripeId)
.orElse(null);
if (sub != null) {
return new PaymentVerificationResponse(true, sessionId, plan, sub.getStatus(), "支付成功,订阅已激活");
}
return new PaymentVerificationResponse(true, sessionId, plan, "pending", "支付已确认,订阅正在激活中");
} else if ("unpaid".equals(paymentStatus)) {
return new PaymentVerificationResponse(false, sessionId, plan, "unpaid", "支付尚未完成");
} else if ("no_payment_required".equals(paymentStatus)) {
return new PaymentVerificationResponse(false, sessionId, plan, "no_payment_required", "无需支付");
}
return new PaymentVerificationResponse(false, sessionId, plan, paymentStatus, "支付状态: " + paymentStatus);
} catch (Exception e) {
log.error("Failed to verify checkout session {}: {}", sessionId, e.getMessage(), e);
return new PaymentVerificationResponse(false, sessionId, null, null, "无法验证支付状态");
}
}
/**
* 创建订阅支付会话
*/
public CheckoutSessionResponse createCheckoutSession(Long userId, CheckoutSessionRequest request) {
SubscriptionPlan plan = validatePlan(request.getPlan());
BillingCycle billingCycle = validateBillingCycle(request.getBillingCycle());
String priceId = getPriceId(plan, billingCycle);
// 1. Stripe HTTP 调用在事务外
StripeCustomer customer = getOrCreateCustomer(userId);
StripeSubscription existingSub = billingPort.findActiveSubscriptionByUserId(userId);
// 2. 已有活跃订阅 → 升级现有订阅
if (existingSub != null) {
return upgradeSubscription(userId, existingSub, priceId, plan, billingCycle);
}
// 3. 无活跃订阅 → 创建新 Checkout Session
try {
String url = paymentPort.createCheckoutSession(
customer.getStripeCustomerId(), priceId,
stripeProperties.getSuccessUrl(), stripeProperties.getCancelUrl());
return new CheckoutSessionResponse(url);
} catch (Exception e) {
log.error("Failed to create Stripe Checkout Session for user {}: {}", userId, e.getMessage(), e);
throw new RuntimeException("创建支付会话失败", e);
}
}
/**
* 升级现有订阅
*/
private CheckoutSessionResponse upgradeSubscription(Long userId,
StripeSubscription existingSub,
String newPriceId,
SubscriptionPlan newPlan,
BillingCycle billingCycle) {
// Stripe HTTP 调用在事务外
SubscriptionUpdateRequest request = new SubscriptionUpdateRequest(
existingSub.getStripeSubscriptionId(), newPriceId, null);
SubscriptionInfo updated;
try {
// Retrieve to get the item ID
SubscriptionInfo subInfo = paymentPort.retrieveSubscription(existingSub.getStripeSubscriptionId());
String oldItemId = subInfo.firstItemId();
if (oldItemId == null) {
throw new IllegalStateException("Subscription " + existingSub.getStripeSubscriptionId() + " has no items");
}
updated = paymentPort.updateSubscription(existingSub.getStripeSubscriptionId(),
new SubscriptionUpdateRequest(oldItemId, newPriceId, null));
} catch (Exception e) {
log.error("Failed to upgrade subscription for user {}: {}", userId, e.getMessage(), e);
throw new RuntimeException("升级订阅失败", e);
}
// DB 操作在编程式事务内
tx.execute(status -> {
doUpgradeSubscription(existingSub, updated, newPriceId, newPlan, billingCycle);
return null;
});
return new CheckoutSessionResponse(null, true);
}
/**
* 获取当前订阅状态
*/
public SubscriptionStatusResponse getSubscriptionStatus(Long userId) {
var subscriptions = billingPort.findSubscriptionsByUserId(userId);
StripeSubscription sub = subscriptions.isEmpty() ? null : subscriptions.get(0);
if (sub == null) {
return new SubscriptionStatusResponse("FREE", "none", null, false);
}
return new SubscriptionStatusResponse(
sub.getPlan(),
sub.getStatus(),
sub.getCurrentPeriodEnd(),
Boolean.TRUE.equals(sub.getCancelAtPeriodEnd())
);
}
/**
* 取消订阅(在周期结束时取消)
*/
public SubscriptionStatusResponse cancelSubscription(Long userId) {
StripeSubscription sub = billingPort.findActiveSubscriptionByUserId(userId);
if (sub == null) {
throw new RuntimeException("没有可取消的活跃订阅");
}
// Stripe HTTP 调用在事务外
try {
paymentPort.updateSubscription(sub.getStripeSubscriptionId(),
new SubscriptionUpdateRequest(null, null, true));
} catch (Exception e) {
log.error("Failed to cancel Stripe subscription {}: {}", sub.getStripeSubscriptionId(), e.getMessage(), e);
throw new RuntimeException("取消订阅失败", e);
}
// DB 操作在编程式事务内
return tx.execute(status -> doCancelSubscription(sub));
}
/**
* 创建 Portal Session(账单管理跳转)
*/
public PortalSessionResponse createPortalSession(Long userId) {
StripeCustomer customer = billingPort.findCustomerByUserIdAndNotDeleted(userId);
if (customer == null) {
throw new RuntimeException("未找到 Stripe 客户");
}
String url = paymentPort.createBillingPortalSession(customer.getStripeCustomerId(), stripeProperties.getCancelUrl());
return new PortalSessionResponse(url);
}
// ==================== Webhook 事件处理 ====================
public void handleCheckoutSessionCompleted(Event event) {
SessionInfo session = deserializeSessionInfo(event, "checkout.session.completed");
if (session == null) {
throw new IllegalStateException("Failed to deserialize checkout.session.completed event: " + event.getId());
}
Map<String, String> metadata = session.metadata();
String userIdStr = metadata != null ? metadata.get("userId") : null;
String planStr = metadata != null ? metadata.get("plan") : null;
String billingCycleStr = metadata != null ? metadata.get("billingCycle") : null;
if (userIdStr == null || planStr == null) {
throw new IllegalStateException("Missing required metadata in session " + session.id() + ": userId=" + userIdStr + ", plan=" + planStr);
}
Long userId = Long.parseLong(userIdStr);
SubscriptionPlan plan = validatePlan(planStr);
BillingCycle billingCycle = BillingCycle.fromValue(billingCycleStr != null ? billingCycleStr : "monthly");
String subscriptionId = session.subscriptionId();
if (subscriptionId == null) {
throw new IllegalStateException("No subscription in session " + session.id());
}
// Stripe HTTP 调用在事务外
SubscriptionInfo stripeSub;
StripeCustomer customer;
try {
stripeSub = paymentPort.retrieveSubscription(subscriptionId);
customer = getOrCreateCustomer(userId);
} catch (Exception e) {
log.error("checkout.session.completed: Stripe API error for session {}: {}", session.id(), e.getMessage(), e);
throw new RuntimeException("Stripe API call failed for session " + session.id(), e);
}
// DB 操作在编程式事务内
tx.execute(status -> {
doHandleCheckoutSessionCompleted(event, userId, plan, billingCycle, subscriptionId, stripeSub, customer);
return null;
});
}
public void handleSubscriptionUpdated(Event event) {
SubscriptionInfo stripeSub = deserializeSubscriptionInfo(event, "subscription.updated");
if (stripeSub == null) {
throw new IllegalStateException("Failed to deserialize subscription.updated event: " + event.getId());
}
String subscriptionId = stripeSub.id();
StripeSubscription subRecord = billingPort.findSubscriptionByStripeId(subscriptionId);
if (subRecord == null) {
log.warn("subscription.updated: no local record for subscription {}", subscriptionId);
return;
}
tx.execute(status -> {
long eventCreated = event.getCreated();
int rows = billingPort.atomicUpdateSubscription(subRecord.getId(), event.getId(), stripeSub.status(), eventCreated);
if (rows == 0) {
log.info("subscription.updated: already processed event {}, skipping", event.getId());
return null;
}
// 更新时间字段
LocalDateTime newPeriodStart = toLocalDateTime(stripeSub.currentPeriodStart());
LocalDateTime newPeriodEnd = toLocalDateTime(stripeSub.currentPeriodEnd());
LocalDateTime newCanceledAt = toLocalDateTime(stripeSub.canceledAt());
if (newPeriodStart != null || newPeriodEnd != null || newCanceledAt != null) {
billingPort.updateSubscriptionFields(subRecord.getId(), newPeriodStart, newPeriodEnd, newCanceledAt);
}
// 同步 userLevel(通过 afterCommit 保证 Redis 在 DB 提交后写入)
scheduleAfterCommit(() -> syncUserLevelAfterStatusChange(subRecord, stripeSub.status(), event));
return null;
});
}
public void handleSubscriptionDeleted(Event event) {
SubscriptionInfo stripeSub = deserializeSubscriptionInfo(event, "subscription.deleted");
if (stripeSub == null) {
throw new IllegalStateException("Failed to deserialize subscription.deleted event: " + event.getId());
}
String subscriptionId = stripeSub.id();
StripeSubscription subRecord = billingPort.findSubscriptionByStripeId(subscriptionId);
if (subRecord == null) {
throw new IllegalStateException("subscription.deleted: no local record for subscription " + subscriptionId);
}
tx.execute(status -> {
long eventCreated = event.getCreated();
int rows = billingPort.atomicUpdateSubscription(subRecord.getId(), event.getId(), "canceled", eventCreated);
if (rows == 0) {
log.info("subscription.deleted: already processed event {}, skipping", event.getId());
return null;
}
scheduleAfterCommit(() -> updateUserLevel(subRecord.getUserId(), "FREE", "subscription.deleted", event.getId()));
return null;
});
}
public void handleSubscriptionResumed(Event event) {
SubscriptionInfo stripeSub = deserializeSubscriptionInfo(event, "subscription.resumed");
if (stripeSub == null) {
throw new IllegalStateException("Failed to deserialize subscription.resumed event: " + event.getId());
}
String subscriptionId = stripeSub.id();
StripeSubscription subRecord = billingPort.findSubscriptionByStripeId(subscriptionId);
if (subRecord == null) {
throw new IllegalStateException("subscription.resumed: no local record for subscription " + subscriptionId);
}
tx.execute(status -> {
long eventCreated = event.getCreated();
int rows = billingPort.atomicUpdateSubscription(subRecord.getId(), event.getId(), stripeSub.status(), eventCreated);
if (rows == 0) {
log.info("subscription.resumed: already processed event {}, skipping", event.getId());
}
return null;
});
}
public void handleInvoicePaymentFailed(Event event) {
com.stripe.model.Invoice invoice = deserializeStripeObject(event, com.stripe.model.Invoice.class, "invoice.payment_failed");
if (invoice == null) {
throw new IllegalStateException("Failed to deserialize invoice.payment_failed event: " + event.getId());
}
String subscriptionId = invoice.getSubscription();
if (subscriptionId == null) {
log.warn("invoice.payment_failed: no subscription in invoice {}", event.getId());
return;
}
StripeSubscription subRecord = billingPort.findSubscriptionByStripeId(subscriptionId);
if (subRecord != null) {
tx.execute(status -> {
long eventCreated = event.getCreated();
int rows = billingPort.atomicUpdateSubscription(subRecord.getId(), event.getId(), "past_due", eventCreated);
if (rows == 0) {
log.info("invoice.payment_failed: already processed event {}, skipping", event.getId());
} else {
log.warn("invoice.payment_failed: subscription {} marked past_due (grace period applies)", subscriptionId);
}
return null;
});
} else {
log.warn("invoice.payment_failed: no local record for subscription {}", subscriptionId);
}
}
public void handleInvoicePaymentSucceeded(Event event) {
com.stripe.model.Invoice invoice = deserializeStripeObject(event, com.stripe.model.Invoice.class, "invoice.payment_succeeded");
if (invoice == null) {
throw new IllegalStateException("Failed to deserialize invoice.payment_succeeded event: " + event.getId());
}
String subscriptionId = invoice.getSubscription();
if (subscriptionId == null) {
log.info("invoice.payment_succeeded: no subscription in invoice {}, skipping", event.getId());
return;
}
StripeSubscription subRecord = billingPort.findSubscriptionByStripeId(subscriptionId);
// 已有记录且状态已是 active/trialing → 已由 checkout.session.completed 处理
if (subRecord != null
&& ("active".equals(subRecord.getStatus()) || "trialing".equals(subRecord.getStatus()))) {
log.info("invoice.payment_succeeded: subscription {} already active, skipping (handled by checkout.session.completed)",
subscriptionId);
return;
}
// 已有记录但状态不是 active → 用原子更新激活
if (subRecord != null) {
tx.execute(status -> {
doActivateSubscriptionFromInvoice(event, subRecord);
return null;
});
return;
}
// 无本地记录 → 从孤立发票创建订阅(Stripe HTTP 在事务外,DB 在事务内)
createSubscriptionFromOrphanedInvoice(event, invoice, subscriptionId);
}
/**
* 从孤立发票创建订阅记录(Stripe HTTP 调用在事务外)
*/
private void createSubscriptionFromOrphanedInvoice(Event event,
com.stripe.model.Invoice invoice,
String subscriptionId) {
String userIdStr = invoice.getMetadata() != null ? invoice.getMetadata().get("userId") : null;
if (userIdStr == null) {
throw new IllegalStateException("invoice.payment_succeeded: orphaned invoice " + event.getId()
+ " has no userId in metadata, cannot create subscription");
}
Long userId = Long.parseLong(userIdStr);
StripeCustomer customer;
SubscriptionInfo stripeSub;
try {
customer = getOrCreateCustomer(userId);
stripeSub = paymentPort.retrieveSubscription(subscriptionId);
} catch (Exception e) {
log.error("invoice.payment_succeeded: Stripe API error for orphaned invoice {}: {}",
event.getId(), e.getMessage(), e);
throw new RuntimeException("Stripe API call failed for orphaned invoice " + event.getId(), e);
}
tx.execute(status -> {
doCreateSubscriptionFromOrphanedInvoice(event, userId, subscriptionId, stripeSub, customer, invoice);
return null;
});
}
// ==================== 窄事务 DB 操作方法(由 TransactionTemplate 包裹) ====================
void doHandleCheckoutSessionCompleted(Event event, Long userId, SubscriptionPlan plan,
BillingCycle billingCycle, String subscriptionId,
SubscriptionInfo stripeSub, StripeCustomer customer) {
StripeSubscription subRecord = billingPort.findSubscriptionByStripeId(subscriptionId);
if (subRecord != null && event.getId().equals(subRecord.getLastWebhookEventId())) {
log.info("checkout.session.completed: already processed event {}, skipping", event.getId());
return;
}
if (subRecord == null) {
subRecord = new StripeSubscription();
subRecord.setUserId(userId);
subRecord.setStripeCustomerId(customer.getStripeCustomerId());
subRecord.setStripeSubscriptionId(subscriptionId);
subRecord.setPlan(plan.getValue());
subRecord.setBillingCycle(billingCycle.getValue());
subRecord.setStripePriceId(stripeSub.priceId());
subRecord.setStatus(stripeSub.status());
subRecord.setCancelAtPeriodEnd(stripeSub.cancelAtPeriodEnd());
subRecord.setCurrentPeriodStart(toLocalDateTime(stripeSub.currentPeriodStart()));
subRecord.setCurrentPeriodEnd(toLocalDateTime(stripeSub.currentPeriodEnd()));
subRecord.setLastWebhookEventId(event.getId());
subRecord.setLastEventCreated(event.getCreated());
boolean insertSucceeded = false;
try {
billingPort.saveSubscription(subRecord);
insertSucceeded = true;
} catch (DuplicateKeyException e) {
subRecord = billingPort.findSubscriptionByStripeId(subscriptionId);
if (subRecord == null) {
throw new IllegalStateException("DuplicateKeyException but re-query returned null for subscriptionId " + subscriptionId, e);
}
log.info("checkout.session.completed: duplicate insert caught, re-queried existing record for subscriptionId {}", subscriptionId);
}
if (!insertSucceeded) {
int claimed = billingPort.claimWebhookEvent(subRecord.getId(), event.getId());
if (claimed == 0) {
log.info("checkout.session.completed: event {} already claimed by concurrent thread, skipping", event.getId());
return;
}
}
log.info("Created subscription record for user {}, plan {}, subscriptionId {}", userId, plan, subscriptionId);
} else {
int claimed = billingPort.claimWebhookEvent(subRecord.getId(), event.getId());
if (claimed == 0) {
log.info("checkout.session.completed: event {} already claimed by concurrent thread for existing record, skipping", event.getId());
return;
}
}
scheduleAfterCommit(() -> updateUserLevel(userId, plan.getValue(), "checkout.session.completed", event.getId()));
}
void doUpgradeSubscription(StripeSubscription existingSub,
SubscriptionInfo updated,
String newPriceId,
SubscriptionPlan newPlan,
BillingCycle billingCycle) {
String oldPlan = existingSub.getPlan();
existingSub.setPlan(newPlan.getValue());
existingSub.setBillingCycle(billingCycle.getValue());
existingSub.setStripePriceId(updated.priceId());
existingSub.setStatus(updated.status());
existingSub.setCancelAtPeriodEnd(updated.cancelAtPeriodEnd());
existingSub.setCurrentPeriodStart(toLocalDateTime(updated.currentPeriodStart()));
existingSub.setCurrentPeriodEnd(toLocalDateTime(updated.currentPeriodEnd()));
existingSub.setLastOperationSource("upgrade_" + System.currentTimeMillis());
billingPort.updateSubscription(existingSub);
scheduleAfterCommit(() -> updateUserLevel(
existingSub.getUserId(), newPlan.getValue(), "subscription_upgrade"));
log.info("Upgraded subscription {} for user {}: {} -> {}, priceId {}",
existingSub.getStripeSubscriptionId(), existingSub.getUserId(),
oldPlan, newPlan, newPriceId);
}
SubscriptionStatusResponse doCancelSubscription(StripeSubscription sub) {
sub.setCancelAtPeriodEnd(true);
sub.setCanceledAt(LocalDateTime.now(ZoneId.of("UTC")));
sub.setLastOperationSource("manual_cancel_" + System.currentTimeMillis());
billingPort.updateSubscription(sub);
return new SubscriptionStatusResponse(
sub.getPlan(),
sub.getStatus(),
sub.getCurrentPeriodEnd(),
true
);
}
void doActivateSubscriptionFromInvoice(Event event, StripeSubscription subRecord) {
long eventCreated = event.getCreated();
int rows = billingPort.atomicUpdateSubscription(subRecord.getId(), event.getId(), "active", eventCreated);
if (rows == 0) {
log.info("invoice.payment_succeeded: already processed event {}, skipping", event.getId());
return;
}
scheduleAfterCommit(() -> updateUserLevel(
subRecord.getUserId(), subRecord.getPlan(),
"invoice.payment_succeeded -> fallback activate", event.getId()));
log.info("invoice.payment_succeeded: fallback activated subscription {} for user {}",
subRecord.getStripeSubscriptionId(), subRecord.getUserId());
}
void doCreateSubscriptionFromOrphanedInvoice(Event event, Long userId,
String subscriptionId,
SubscriptionInfo stripeSub,
StripeCustomer customer,
com.stripe.model.Invoice invoice) {
// 双重检查:并发情况下可能已被 checkout.session.completed 创建
StripeSubscription existing = billingPort.findSubscriptionByStripeId(subscriptionId);
if (existing != null
&& ("active".equals(existing.getStatus()) || "trialing".equals(existing.getStatus()))) {
log.info("invoice.payment_succeeded: orphaned path race, subscription {} already active, skipping",
subscriptionId);
return;
}
StripeSubscription subRecord = new StripeSubscription();
subRecord.setUserId(userId);
subRecord.setStripeCustomerId(customer.getStripeCustomerId());
subRecord.setStripeSubscriptionId(subscriptionId);
subRecord.setLastWebhookEventId(event.getId());
subRecord.setLastEventCreated(event.getCreated());
subRecord.setStripePriceId(stripeSub.priceId());
subRecord.setStatus(stripeSub.status());
subRecord.setCancelAtPeriodEnd(stripeSub.cancelAtPeriodEnd());
subRecord.setCurrentPeriodStart(toLocalDateTime(stripeSub.currentPeriodStart()));
subRecord.setCurrentPeriodEnd(toLocalDateTime(stripeSub.currentPeriodEnd()));
// 从 Stripe Subscription metadata 或 invoice metadata 推断 plan
String planStr = null;
// Try to get metadata from the original event object
try {
var deserializer = event.getDataObjectDeserializer();
var obj = deserializer.getObject().orElse(null);
if (obj instanceof com.stripe.model.Subscription s && s.getMetadata() != null) {
planStr = s.getMetadata().get("plan");
}
} catch (Exception ignored) {}
if (planStr == null && invoice.getMetadata() != null) {
planStr = invoice.getMetadata().get("plan");
}
if (planStr == null) {
planStr = "PRO";
log.warn("invoice.payment_succeeded: orphaned invoice {} has no plan metadata, defaulting to PRO", event.getId());
}
subRecord.setPlan(planStr);
String billingCycleStr = "monthly";
try {
var deserializer = event.getDataObjectDeserializer();
var obj = deserializer.getObject().orElse(null);
if (obj instanceof com.stripe.model.Subscription s && s.getMetadata() != null) {
billingCycleStr = s.getMetadata().getOrDefault("billingCycle", "monthly");
}
} catch (Exception ignored) {}
if (invoice.getMetadata() != null && invoice.getMetadata().containsKey("billingCycle")) {
billingCycleStr = invoice.getMetadata().get("billingCycle");
}
subRecord.setBillingCycle(billingCycleStr);
boolean insertSucceeded = false;
try {
billingPort.saveSubscription(subRecord);
insertSucceeded = true;
} catch (DuplicateKeyException e) {
subRecord = billingPort.findSubscriptionByStripeId(subscriptionId);
if (subRecord == null) {
throw new IllegalStateException("DuplicateKeyException but re-query returned null for subscriptionId " + subscriptionId, e);
}
}
// Capture final reference for lambda
final StripeSubscription savedRecord = subRecord;
if (!insertSucceeded) {
int claimed = billingPort.claimWebhookEvent(savedRecord.getId(), event.getId());
if (claimed == 0) {
log.info("invoice.payment_succeeded: event {} already claimed by concurrent thread, skipping", event.getId());
return;
}
}
scheduleAfterCommit(() -> updateUserLevel(
userId, savedRecord.getPlan(), "invoice.payment_succeeded -> orphaned create", event.getId()));
}
// ==================== 内部工具方法 ====================
/**
* 泛型反序列化:提取 Stripe 对象,减少重复代码
*/
private <T> T deserializeStripeObject(Event event, Class<T> clazz, String eventType) {
try {
var deserializer = event.getDataObjectDeserializer();
@SuppressWarnings("unchecked")
T obj = (T) deserializer.getObject().orElse(null);
if (obj == null) {
obj = clazz.cast(deserializer.deserializeUnsafe());
if (obj == null) {
log.warn("{}: failed to deserialize event object", eventType);
return null;
}
}
return obj;
} catch (Exception e) {
log.error("{}: deserialization error", eventType, e);
return null;
}
}
/**
* 内部 record:从事件反序列化的 session 信息
*/
record SessionInfo(String id, String subscriptionId, Map<String, String> metadata) {}
private SessionInfo deserializeSessionInfo(Event event, String eventType) {
try {
var deserializer = event.getDataObjectDeserializer();
var obj = deserializer.getObject().orElse(null);
if (obj == null) {
obj = deserializer.deserializeUnsafe();
}
if (obj instanceof com.stripe.model.checkout.Session session) {
return new SessionInfo(session.getId(), session.getSubscription(), session.getMetadata());
}
log.warn("{}: failed to deserialize event object", eventType);
return null;
} catch (Exception e) {
log.error("{}: deserialization error", eventType, e);
return null;
}
}
private SubscriptionInfo deserializeSubscriptionInfo(Event event, String eventType) {
try {
var deserializer = event.getDataObjectDeserializer();
var obj = deserializer.getObject().orElse(null);
if (obj == null) {
obj = deserializer.deserializeUnsafe();
}
if (obj instanceof com.stripe.model.Subscription sub) {
String firstItemId = Optional.ofNullable(sub.getItems())
.map(items -> items.getData())
.filter(list -> !list.isEmpty())
.map(list -> list.get(0).getId())
.orElse(null);
String priceId = Optional.ofNullable(sub.getItems())
.map(items -> items.getData())
.filter(list -> !list.isEmpty())
.map(list -> list.get(0))
.map(item -> item.getPrice())
.filter(price -> price != null)
.map(price -> price.getId())
.orElse(null);
return new SubscriptionInfo(
sub.getId(), sub.getStatus(),
sub.getCurrentPeriodStart(), sub.getCurrentPeriodEnd(),
sub.getCancelAtPeriodEnd(), firstItemId, priceId, sub.getCanceledAt());
}
log.warn("{}: failed to deserialize event object", eventType);
return null;
} catch (Exception e) {
log.error("{}: deserialization error", eventType, e);
return null;
}
}
private StripeCustomer getOrCreateCustomer(Long userId) {
StripeCustomer existing = billingPort.findCustomerByUserIdAndNotDeleted(userId);
if (existing != null) {
return existing;
}
User user = userRepositoryPort.findById(userId).orElse(null);
if (user == null) {
throw new RuntimeException("用户不存在,userId=" + userId);
}
try {
CustomerInfo customer = paymentPort.createCustomer(user.getEmail());
StripeCustomer newCustomer = new StripeCustomer();
newCustomer.setUserId(userId);
newCustomer.setStripeCustomerId(customer.id());
billingPort.saveCustomer(newCustomer);
log.info("Created Stripe Customer for user {}: {}", userId, customer.id());
return newCustomer;
} catch (Exception e) {
log.error("Failed to create Stripe Customer for user {}: {}", userId, e.getMessage(), e);
throw new RuntimeException("创建 Stripe 客户失败", e);
}
}
private String getPriceId(SubscriptionPlan plan, BillingCycle billingCycle) {
Map<String, StripeProperties.PlanPrices> prices = stripeProperties.getPrices();
if (prices == null) {
throw new RuntimeException("Stripe prices not configured");
}
StripeProperties.PlanPrices planPrices = prices.get(plan.getValue().toLowerCase());
if (planPrices == null) {
throw new RuntimeException("No Stripe prices configured for plan: " + plan);
}
String priceId = switch (billingCycle) {
case MONTHLY -> planPrices.getMonthlyPriceId();
case YEARLY -> planPrices.getYearlyPriceId();
};
if (priceId == null || priceId.isBlank()) {
throw new RuntimeException("No price configured for " + plan + " " + billingCycle);
}
return priceId;
}
private SubscriptionPlan validatePlan(String plan) {
try {
return SubscriptionPlan.fromValue(plan);
} catch (IllegalArgumentException e) {
String validValues = String.join(", ", SubscriptionPlan.VALUES);
throw new IllegalArgumentException("无效的套餐类型: " + plan + ",可选值: " + validValues);
}
}
private BillingCycle validateBillingCycle(String billingCycle) {
try {
return BillingCycle.fromValue(billingCycle);
} catch (IllegalArgumentException e) {
String validValues = String.join(", ", BillingCycle.VALUES);
throw new IllegalArgumentException("无效的计费周期: " + billingCycle + ",可选值: " + validValues);
}
}
/**
* 将 Stripe epoch 秒数转换为 UTC LocalDateTime
*/
private LocalDateTime toLocalDateTime(Long epochSeconds) {
if (epochSeconds == null) {
return null;
}
return LocalDateTime.ofInstant(Instant.ofEpochSecond(epochSeconds), ZoneId.of("UTC"));
}
/**
* 注册在事务提交后执行的回调,用于 Redis 幂等标记等非事务操作。
*/
private void scheduleAfterCommit(Runnable action) {
if (TransactionSynchronizationManager.isSynchronizationActive()) {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
action.run();
}
});
} else {
action.run();
}
}
/**
* 基于 Redis SETNX 的事件级幂等检查
*/
private boolean markEventProcessed(String eventId) {
String key = "webhook:event_processed:" + eventId;
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", java.time.Duration.ofHours(24));
return Boolean.TRUE.equals(success);
}
/**
* 同步用户等级(状态变更后调用)
*/
private void syncUserLevelAfterStatusChange(StripeSubscription subRecord, String newStatus, Event event) {
String targetLevel;
if ("active".equals(newStatus) || "trialing".equals(newStatus)) {
targetLevel = subRecord.getPlan();
} else if ("past_due".equals(newStatus)) {
log.warn("subscription {}: past_due, not downgrading yet (grace period)", subRecord.getStripeSubscriptionId());
return;
} else if ("canceled".equals(newStatus) || "unpaid".equals(newStatus)) {
targetLevel = "FREE";
} else if ("paused".equals(newStatus)) {
log.info("subscription {}: paused, keeping userLevel unchanged", subRecord.getStripeSubscriptionId());
return;
} else {
log.info("subscription {}: unknown status {}, keeping userLevel unchanged", subRecord.getStripeSubscriptionId(), newStatus);
return;
}
scheduleAfterCommit(() -> updateUserLevel(subRecord.getUserId(), targetLevel,
"subscription.updated -> " + newStatus, event.getId()));
}
private void updateUserLevel(Long userId, String newLevel, String reason, String eventId) {
if (eventId != null && !markEventProcessed(eventId)) {
log.info("updateUserLevel: event {} already processed, skipping", eventId);
return;
}
updateUserLevel(userId, newLevel, reason);
}
private void updateUserLevel(Long userId, String newLevel, String reason) {
User user = userRepositoryPort.findById(userId).orElse(null);
if (user == null) {
log.error("Cannot update userLevel: user {} not found", userId);
return;
}
String oldLevel = user.getUserLevel();
if (newLevel.equals(oldLevel)) {
return;
}
user.setUserLevel(newLevel);
userRepositoryPort.update(user);
UserPlanHistory history = new UserPlanHistory();
history.setUserId(userId);
history.setOldPlan(oldLevel != null ? oldLevel : "UNKNOWN");
history.setNewPlan(newLevel);
history.setNote(reason);
userRepositoryPort.savePlanHistory(history);
log.info("User {} level changed: {} -> {} (reason: {})", userId, oldLevel, newLevel, reason);
// 降级到 FREE 时:通过端口吊销令牌
if ("FREE".equals(newLevel) && !"FREE".equals(oldLevel)) {
tokenRevocationPort.revokeAllTokens(userId, user.getEmail(), "subscription_downgrade: " + reason);
}
}
}