TranslationApplicationService.java
package com.yumu.noveltranslator.application.service;
import com.alibaba.fastjson2.JSON;
import com.yumu.noveltranslator.port.dto.translation.SelectionTranslateResponse;
import com.yumu.noveltranslator.port.dto.translation.ReaderTranslateResponse;
import com.yumu.noveltranslator.port.dto.translation.SelectionTranslationRequest;
import com.yumu.noveltranslator.port.dto.translation.ReaderTranslateRequest;
import com.yumu.noveltranslator.port.dto.translation.WebpageTranslateRequest;
import com.yumu.noveltranslator.domain.model.User;
import com.yumu.noveltranslator.enums.TranslationMode;
import com.yumu.noveltranslator.domain.service.TranslationPipeline;
import com.yumu.noveltranslator.application.service.RagTranslationApplicationService;
import com.yumu.noveltranslator.domain.service.TranslationPostProcessingService;
import com.yumu.noveltranslator.domain.service.EntityConsistencyService;
import com.yumu.noveltranslator.domain.service.QuotaService;
import com.yumu.noveltranslator.domain.util.EngineAliasRegistry;
import com.yumu.noveltranslator.port.out.TranslationCachePort;
import com.yumu.noveltranslator.port.out.TranslationClientPort;
import com.yumu.noveltranslator.port.out.TeamTranslationPort;
import com.yumu.noveltranslator.util.CacheKeyUtil;
import com.yumu.noveltranslator.util.SseEmitterUtil;
import com.yumu.noveltranslator.util.TextCleaningUtil;
import com.yumu.noveltranslator.util.TextSegmentationUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 翻译服务类
* 提供三种翻译模式:选中文本翻译、阅读器翻译、网页翻译
*
* 优化特性:
* - 三级缓存:内存缓存 → 数据库缓存 → 翻译服务
* - 改进的缓存 Key:使用 MD5 避免碰撞
* - 全局线程池复用
* - 批量翻译优化
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class TranslationApplicationService implements com.yumu.noveltranslator.port.in.TranslatePort {
private static final String DEFAULT_TARGET_LANG = "zh";
private static final String DEFAULT_ENGINE = "auto";
/**
* 语言代码 → 语言名称映射(用于团队翻译 prompt)
* Python 侧 prompt 使用自然语言名称(如 "请将以下中文文本翻译为日文")
*/
private static final Map<String, String> LANGUAGE_NAME_MAP = Map.ofEntries(
Map.entry("zh", "中文"),
Map.entry("cn", "中文"),
Map.entry("en", "英文"),
Map.entry("ja", "日文"),
Map.entry("jp", "日文"),
Map.entry("ko", "韩文"),
Map.entry("fr", "法文"),
Map.entry("de", "德文"),
Map.entry("es", "西班牙文"),
Map.entry("pt", "葡萄牙文"),
Map.entry("ru", "俄文"),
Map.entry("it", "意大利文"),
Map.entry("ar", "阿拉伯文"),
Map.entry("th", "泰文"),
Map.entry("vi", "越南文")
);
/**
* 将语言代码转换为自然语言名称
* 例如 "zh" → "中文", "ja" → "日文"
*/
private static String toLanguageName(String code) {
if (code == null || "auto".equalsIgnoreCase(code)) {
return "未知";
}
return LANGUAGE_NAME_MAP.getOrDefault(code.toLowerCase(), code);
}
// 依赖注入
private final TranslationClientPort translationClientPort;
private final TranslationCachePort cachePort;
private final RagTranslationApplicationService ragTranslationService;
private final com.yumu.noveltranslator.domain.service.EntityConsistencyService entityConsistencyService;
private final TranslationPostProcessingService postProcessingService;
private final TeamTranslationPort teamTranslationPort;
private final com.yumu.noveltranslator.domain.service.QuotaService quotaService;
/**
* 选中文本翻译
* @param req 翻译请求
* @return 翻译响应
*/
public SelectionTranslateResponse selectionTranslate(Long userId, SelectionTranslationRequest req) {
String combined = req.getText();
if (combined == null || combined.trim().isEmpty()) {
return new SelectionTranslateResponse(false, req.getEngine(), "内容为空");
}
TranslationMode mode = EngineAliasRegistry.normalizeToMode(req.getEngine());
String target = req.getTargetLang() == null ? DEFAULT_TARGET_LANG : req.getTargetLang();
// 检查字符配额
if (userId != null) {
String userLevel = com.yumu.noveltranslator.util.SecurityUtil.getCurrentUserLevel().orElse(null);
if (userLevel != null) {
String modeName = req.getMode() != null ? req.getMode() : "fast";
if (!quotaService.tryConsumeChars(userId, userLevel, combined.length(), modeName)) {
return new SelectionTranslateResponse(false, mode.getName(), "字符配额不足,请升级档位或等待下月重置");
}
}
}
try {
TranslationPipeline pipeline = new TranslationPipeline(
cachePort, ragTranslationService, entityConsistencyService,
translationClientPort, postProcessingService, userId, null);
String result = pipeline.executeFast(combined, target, mode);
if (result == null || result.trim().isEmpty()) {
if (userId != null) {
String modeName = req.getMode() != null ? req.getMode() : "fast";
quotaService.refundChars(userId, combined.length(), modeName);
}
return new SelectionTranslateResponse(false, mode.getName(), "翻译失败:返回结果为空");
}
// 净化翻译结果,防止恶意 HTML/脚本注入(XSS 防护)
String sanitized = TextCleaningUtil.sanitizeHtml(result);
return new SelectionTranslateResponse(true, mode.getName(), sanitized);
} catch (Exception e) {
log.error("选中文本翻译失败:{}", e.getMessage(), e);
if (userId != null) {
String modeName = req.getMode() != null ? req.getMode() : "fast";
quotaService.refundChars(userId, combined.length(), modeName);
}
return new SelectionTranslateResponse(false, mode.getName(), "翻译失败:服务器内部错误");
}
}
/**
* 阅读器翻译 - 支持并行翻译长文本
* 默认使用 MTranServer 进行翻译,html 模式启用
* @param req 翻译请求
* @return 翻译响应
*/
public ReaderTranslateResponse readerTranslate(Long userId, ReaderTranslateRequest req) {
String content = req.getContent();
if (content == null || content.trim().isEmpty()) {
return new ReaderTranslateResponse(false, req.getEngine(), "没有收到内容");
}
String target = req.getTargetLang() == null ? DEFAULT_TARGET_LANG : req.getTargetLang();
TranslationMode mode = EngineAliasRegistry.normalizeToMode(req.getEngine());
// 检查字符配额
if (userId != null) {
String userLevel = com.yumu.noveltranslator.util.SecurityUtil.getCurrentUserLevel().orElse(null);
if (userLevel != null) {
String modeName = req.getMode() != null ? req.getMode() : "fast";
if (!quotaService.tryConsumeChars(userId, userLevel, content.length(), modeName)) {
return new ReaderTranslateResponse(false, mode.getName(), "字符配额不足,请升级档位或等待下月重置");
}
}
}
// 文本分段
List<String> segments;
try {
segments = TextSegmentationUtil.segmentByTextEngine(content, mode.getName());
} catch (Exception e) {
log.error("阅读器文本分段失败:{}", e.getMessage(), e);
if (userId != null) {
String modeName = req.getMode() != null ? req.getMode() : "fast";
quotaService.refundChars(userId, content.length(), modeName);
}
return new ReaderTranslateResponse(false, mode.getName(), "文本分段失败:服务器内部错误");
}
if (segments.isEmpty()) {
if (userId != null) {
String modeName = req.getMode() != null ? req.getMode() : "fast";
quotaService.refundChars(userId, content.length(), modeName);
}
return new ReaderTranslateResponse(false, mode.getName(), "段落为空");
}
// 并行翻译所有段落(复用全局线程池)
// 阅读器模式默认使用 MTranServer,html=true
try {
List<String> translatedSegments = translateSegmentsInParallel(segments, target, mode, true, userId);
String rawResult = String.join("", translatedSegments);
String combinedResult = TextCleaningUtil.sanitizeHtml(rawResult);
log.info("阅读器翻译完成,原文长度:{},译文长度:{}", content.length(), combinedResult.length());
return new ReaderTranslateResponse(true, mode.getName(), combinedResult);
} catch (Exception e) {
log.error("阅读器翻译失败:{}", e.getMessage(), e);
if (userId != null) {
String modeName = req.getMode() != null ? req.getMode() : "fast";
quotaService.refundChars(userId, content.length(), modeName);
}
return new ReaderTranslateResponse(false, mode.getName(), "翻译失败:服务器内部错误");
}
}
/**
* 并行翻译文本段落(使用虚拟线程并发翻译)
*
* 优化策略:
* 1. 先尝试从缓存获取每个段落
* 2. 对缓存未命中的段落使用虚拟线程并发翻译
* 3. 翻译结果写回缓存
*
* @param segments 待翻译的文本段落
* @param target 目标语言
* @param engine 翻译引擎
* @param html 是否启用 HTML 翻译模式(仅对 MTranServer 有效)
*/
private List<String> translateSegmentsInParallel(List<String> segments, String target, TranslationMode mode,
boolean html, Long userId) {
List<String> results = new ArrayList<>(segments.size());
List<int[]> indexesToTranslate = new ArrayList<>();
// 1. 先尝试从缓存获取(分层缓存)
for (int i = 0; i < segments.size(); i++) {
String segment = segments.get(i);
String cacheKey = CacheKeyUtil.buildCacheKey(segment, target);
String cached = cachePort.getCacheByMode(cacheKey, mode.getName()).orElse(null);
if (cached != null) {
results.add(cached);
log.debug("阅读器缓存命中:索引 {} mode={}", i, mode.getName());
} else {
results.add(null); // 占位,稍后填充
indexesToTranslate.add(new int[]{i});
}
}
// 2. 使用 Pipeline 快速模式并发翻译缓存未命中的段落
if (!indexesToTranslate.isEmpty()) {
TranslationPipeline pipeline = new TranslationPipeline(
cachePort, ragTranslationService, entityConsistencyService,
translationClientPort, postProcessingService, userId, null);
List<Runnable> tasks = new ArrayList<>();
List<String> tempResults = new ArrayList<>(indexesToTranslate.size());
for (int j = 0; j < indexesToTranslate.size(); j++) {
tempResults.add(null);
}
for (int j = 0; j < indexesToTranslate.size(); j++) {
int i = indexesToTranslate.get(j)[0];
final int index = i;
final int taskIndex = j;
String segment = segments.get(i);
tasks.add(() -> {
try {
String translation = pipeline.executeFast(segment, target, mode, html);
if (translation != null && !translation.trim().isEmpty()) {
tempResults.set(taskIndex, translation);
} else {
tempResults.set(taskIndex, segment);
}
} catch (Exception e) {
log.error("翻译段落失败(使用原文兜底):索引 {}, 错误:{}", index, e.getMessage());
tempResults.set(taskIndex, segment);
}
});
}
// 使用虚拟线程并发执行所有翻译任务
List<Thread> threads = new ArrayList<>();
for (Runnable task : tasks) {
Thread thread = Thread.startVirtualThread(task);
threads.add(thread);
}
// 等待所有线程完成
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("等待翻译线程被中断");
}
}
// 填充翻译结果
for (int j = 0; j < indexesToTranslate.size(); j++) {
int i = indexesToTranslate.get(j)[0];
String translation = tempResults.get(j);
if (translation != null) {
results.set(i, translation);
}
}
}
// 3. 清理 null 值
for (int i = 0; i < results.size(); i++) {
if (results.get(i) == null) {
results.set(i, segments.get(i));
log.warn("发现未处理的 null 值,使用原文填充:索引 {}", i);
}
}
return results;
}
/**
* 网页翻译流式版本 - 并发翻译(虚拟线程 + 信号量限流)
*/
public SseEmitter webpageTranslateStream(Long userId, WebpageTranslateRequest req) {
SseEmitter emitter = SseEmitterUtil.createSseEmitter(300000L); // 5 分钟超时
// 计算总字符数
var items = req.getTextRegistry();
int totalChars = items.stream()
.mapToInt(item -> item.getOriginal() != null ? item.getOriginal().length() : 0)
.sum();
// 检查字符配额
String quotaMode = (req.getFastMode() != null && !req.getFastMode()) ? "expert" : "fast";
if (userId != null) {
String userLevel = com.yumu.noveltranslator.util.SecurityUtil.getCurrentUserLevel().orElse(null);
if (userLevel != null) {
if (!quotaService.tryConsumeChars(userId, userLevel, totalChars, quotaMode)) {
SseEmitterUtil.sendError(emitter, "字符配额不足,请升级档位或等待下月重置");
SseEmitterUtil.complete(emitter);
return emitter;
}
}
}
Thread.startVirtualThread(() -> {
try {
String target = req.getTargetLang() == null ? DEFAULT_TARGET_LANG : req.getTargetLang();
TranslationMode mode = EngineAliasRegistry.normalizeToMode(req.getEngine());
int totalCount = items.size();
log.info("[SSE流式翻译] 开始处理,文本数量: {}, 引擎: {}, fastMode: {}", totalCount, req.getEngine(), req.getFastMode());
TranslationPipeline pipeline = new TranslationPipeline(
cachePort, ragTranslationService, entityConsistencyService,
translationClientPort, postProcessingService, userId, null);
log.info("[SSE流式翻译] Pipeline 初始化完成");
// 限制并发数,避免超过 DeepSeek 限流(10 req/s)和用户级别并发限制
int maxConcurrency = 5;
var semaphore = new java.util.concurrent.Semaphore(maxConcurrency);
var barrier = new java.util.concurrent.CountDownLatch(totalCount);
for (WebpageTranslateRequest.TextItem item : items) {
// 用 final 局部变量捕获当前迭代的 item,防止 lambda 闭包在多线程环境下共享循环变量
final WebpageTranslateRequest.TextItem currentItem = item;
Thread.startVirtualThread(() -> {
try {
semaphore.acquire();
String id = currentItem.getId() == null ? "" : currentItem.getId();
String original = currentItem.getOriginal() == null ? "" : currentItem.getOriginal();
String cleanText = TextCleaningUtil.cleanText(original);
// fastMode=true(默认)使用 MTranServer,fastMode=false(专家模式)使用 DeepSeek
boolean fastMode = req.getFastMode() == null || req.getFastMode();
String extracted = pipeline.executeFast(cleanText, target, mode, !fastMode);
if (extracted == null || extracted.trim().isEmpty()) {
log.warn("翻译失败,使用原文兜底:ID={}", id);
extracted = original;
}
Map<String, Object> eventData = new HashMap<>(3);
// 净化翻译结果,防止恶意 HTML/脚本注入(XSS 防护)
String sanitized = TextCleaningUtil.sanitizeHtml(extracted);
eventData.put("textId", id);
eventData.put("original", original);
eventData.put("translation", sanitized);
SseEmitterUtil.sendData(emitter, JSON.toJSONString(eventData));
} catch (Exception e) {
log.error("翻译失败 - ID: {}, 错误: {}", currentItem.getId(), e.getMessage());
Map<String, Object> errorData = new HashMap<>(3);
errorData.put("textId", currentItem.getId());
String orig = currentItem.getOriginal() == null ? "" : currentItem.getOriginal();
errorData.put("original", orig);
errorData.put("translation", TextCleaningUtil.sanitizeHtml(orig));
SseEmitterUtil.sendData(emitter, JSON.toJSONString(errorData));
} finally {
semaphore.release();
barrier.countDown();
}
});
}
// 心跳线程:每 30 秒发送一次心跳,使用注册表检测 emitter 是否仍然活跃
String emitterId = SseEmitterUtil.registerEmitter(emitter);
Thread.startVirtualThread(() -> {
while (SseEmitterUtil.isEmitterActive(emitterId)) {
try {
Thread.sleep(30_000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
SseEmitterUtil.sendHeartbeat(emitter);
}
});
log.info("[SSE流式翻译] 所有翻译线程已启动,等待完成 ({} 个文本)", totalCount);
// 等待所有翻译完成
barrier.await();
log.info("[SSE流式翻译] 所有翻译已完成,发送 [DONE] 信号");
SseEmitterUtil.sendDone(emitter);
SseEmitterUtil.complete(emitter);
} catch (Exception e) {
// 翻译失败,触发心跳停止
log.error("网页翻译失败:{}", e.getMessage(), e);
if (userId != null) {
quotaService.refundChars(userId, totalChars, quotaMode);
}
SseEmitterUtil.sendError(emitter, "翻译失败:服务器内部错误");
SseEmitterUtil.complete(emitter);
}
});
return emitter;
}
/**
* 文本流式翻译(SSE)— 适用于长文本的单段流式输出
* 按段落分段,逐段 SSE 输出
*/
public SseEmitter streamTextTranslate(Long userId, SelectionTranslationRequest req) {
log.info("[STREAM-TRACE] Service entry: userId={}, engine={}, target={}, mode={}, textLen={}",
userId, req.getEngine(), req.getTargetLang(), req.getMode(), req.getText() != null ? req.getText().length() : 0);
SseEmitter emitter = SseEmitterUtil.createSseEmitter(300000L);
String text = req.getText();
if (text == null || text.trim().isEmpty()) {
log.info("[STREAM-TRACE] Text empty, returning error");
SseEmitterUtil.sendError(emitter, "文本不能为空");
SseEmitterUtil.complete(emitter);
return emitter;
}
TranslationMode mode = EngineAliasRegistry.normalizeToMode(req.getEngine());
String target = req.getTargetLang() == null ? DEFAULT_TARGET_LANG : req.getTargetLang();
String modeString = req.getMode() != null ? req.getMode() : "fast";
log.info("[STREAM-TRACE] Normalized: mode={}, target={}", mode.getName(), target);
// 检查字符配额(从 SecurityContext 获取 userLevel)
if (userId != null) {
String userLevel = com.yumu.noveltranslator.util.SecurityUtil.getCurrentUserLevel().orElse(null);
if (userLevel != null) {
if (!quotaService.tryConsumeChars(userId, userLevel, text.length(), modeString)) {
SseEmitterUtil.sendError(emitter, "字符配额不足,请升级档位或等待下月重置");
SseEmitterUtil.complete(emitter);
return emitter;
}
}
}
Thread.startVirtualThread(() -> {
try {
List<String> segments = TextSegmentationUtil.segmentByTextEngine(text, mode.getName());
if (segments.isEmpty()) {
segments = List.of(text);
}
TranslationPipeline pipeline = new TranslationPipeline(
cachePort, ragTranslationService, entityConsistencyService,
translationClientPort, postProcessingService, userId, null);
// 先用 \n\n+ 将全文拆分为逻辑段落
String[] logicalParagraphs = text.split("\n\n+");
log.info("[STREAM-TRACE] 全文拆分为 {} 个逻辑段落, 总行数 ~{}", logicalParagraphs.length,
text.split("\n", -1).length);
StringBuilder fullResult = new StringBuilder();
for (int i = 0; i < logicalParagraphs.length; i++) {
SseEmitterUtil.sendHeartbeat(emitter);
String para = logicalParagraphs[i];
if (para.isBlank()) {
fullResult.append("\n\n");
continue;
}
// 段落内按 \n 逐行翻译,保持原文单行换行结构
String[] lines = para.split("\n", -1);
StringBuilder paraResult = new StringBuilder();
log.info("[流式翻译] 段落包含 {} 行", lines.length);
for (int j = 0; j < lines.length; j++) {
String line = lines[j];
if (line.isEmpty()) {
paraResult.append("\n");
continue;
}
String translated;
String pipelineMode = "fast".equals(modeString) ? "executeFast" : "execute";
log.info("[STREAM-TRACE] Pipeline 委托: pipelineMode={}, lineLen={}, para={}, line={}",
pipelineMode, line.length(), i, j);
if ("fast".equals(modeString)) {
translated = pipeline.executeFast(line, target, mode);
} else {
translated = pipeline.execute(line, target, mode);
}
log.info("[STREAM-TRACE] Pipeline 返回: translatedLen={}", translated != null ? translated.length() : 0);
if (translated == null || translated.isBlank()) {
log.info("[STREAM-TRACE] Pipeline 返回空,使用原文兜底");
translated = line;
}
if (j > 0) paraResult.append("\n");
paraResult.append(translated);
}
log.info("[流式翻译] 段落结果行数: {} (与原文一致)", paraResult.toString().split("\n", -1).length);
if (i > 0) fullResult.append("\n\n");
fullResult.append(paraResult);
// 流式发送当前段落结果(含段间分隔符)
// §NL§ = 单换行, §NL§§NL§ = 段落分隔
String chunk = TextCleaningUtil.sanitizeHtml(paraResult.toString()).replace("\n", "§NL§");
if (i > 0) {
chunk = "§NL§§NL§" + chunk;
}
SseEmitterUtil.sendData(emitter, chunk);
}
log.info("[STREAM-TRACE] 流式翻译完成, 原文长度={}, 译文长度={}", text.length(), fullResult.length());
SseEmitterUtil.sendDone(emitter);
SseEmitterUtil.complete(emitter);
} catch (Exception e) {
log.error("文本流式翻译失败:{}", e.getMessage(), e);
Long currentUserId = userId;
if (currentUserId != null) {
quotaService.refundChars(currentUserId, text.length(), modeString);
}
SseEmitterUtil.sendError(emitter, "翻译失败:服务器内部错误");
SseEmitterUtil.complete(emitter);
}
});
return emitter;
}
/**
* 获取缓存统计信息
*/
public Map<String, Object> getCacheStats() {
return cachePort.getCacheStats();
}
}