TranslationTaskApplicationService.java

package com.yumu.noveltranslator.application.service;

import com.yumu.noveltranslator.port.dto.translation.TranslationResultResponse;
import com.yumu.noveltranslator.port.dto.entity.TaskStatusResponse;
import com.yumu.noveltranslator.port.dto.entity.TranslationHistoryResponse;
import com.yumu.noveltranslator.domain.model.Document;
import com.yumu.noveltranslator.domain.model.Glossary;
import com.yumu.noveltranslator.domain.model.TranslationHistory;
import com.yumu.noveltranslator.domain.model.TranslationTask;
import com.yumu.noveltranslator.enums.TranslationMode;
import com.yumu.noveltranslator.enums.TranslationStatus;
import com.yumu.noveltranslator.port.out.TranslationRepositoryPort;
import com.yumu.noveltranslator.port.out.DocumentRepositoryPort;
import com.yumu.noveltranslator.port.out.GlossaryRepositoryPort;
import com.yumu.noveltranslator.port.out.TranslationCachePort;
import com.yumu.noveltranslator.port.out.TranslationClientPort;
import com.yumu.noveltranslator.domain.service.TranslationStateMachine;
import com.yumu.noveltranslator.domain.service.TranslationPipeline;
import com.yumu.noveltranslator.application.service.RagTranslationApplicationService;
import com.yumu.noveltranslator.domain.service.EntityConsistencyService;
import com.yumu.noveltranslator.domain.service.TranslationPostProcessingService;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yumu.noveltranslator.util.ExternalResponseUtil;
import com.yumu.noveltranslator.util.SseEmitterUtil;
import lombok.RequiredArgsConstructor;
import org.springframework.web.multipart.MultipartFile;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import org.springframework.scheduling.annotation.Scheduled;
import jakarta.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;

/**
 * 翻译任务服务
 */
@Service
@RequiredArgsConstructor
public class TranslationTaskApplicationService implements com.yumu.noveltranslator.port.in.TranslationTaskPort {
    private static final Logger log = LoggerFactory.getLogger(TranslationTaskApplicationService.class);

    private final TranslationRepositoryPort translationPort;
    private final DocumentRepositoryPort documentPort;
    private final GlossaryRepositoryPort glossaryPort;
    private final TranslationStateMachine stateMachine;
    private final TranslationClientPort translationClientPort;
    private final TranslationCachePort cachePort;
    private final RagTranslationApplicationService ragTranslationService;
    private final EntityConsistencyService entityConsistencyService;
    private final TranslationPostProcessingService postProcessingService;

    /**
     * 创建文档翻译任务
     */
    public TranslationTask createDocumentTask(Long userId, Document doc) {
        String taskId = generateTaskId();

        TranslationTask task = new TranslationTask();
        task.setTaskId(taskId);
        task.setUserId(userId);
        task.setType("document");
        task.setDocumentId(doc.getId());
        task.setSourceLang(doc.getSourceLang());
        task.setTargetLang(doc.getTargetLang());
        task.setMode(doc.getMode());
        task.setEngine("google");
        task.setStatus(TranslationStatus.PENDING.getValue());
        task.setProgress(0);
        task.setCreateTime(LocalDateTime.now());

        translationPort.saveTask(task);

        // 关联文档与任务
        doc.setTaskId(taskId);
        documentPort.update(doc);

        return task;
    }

    /**
     * 启动文档翻译
     */
    public void startDocumentTranslation(TranslationTask task, Document doc) {
        if (task == null || doc == null) {
            return;
        }
        if (!TranslationStatus.PENDING.getValue().equals(task.getStatus()) && !TranslationStatus.FAILED.getValue().equals(task.getStatus())) {
            return;
        }

        // 更新文档状态为处理中
        doc.setStatus(TranslationStatus.PROCESSING.getValue());
        doc.setUpdateTime(LocalDateTime.now());
        documentPort.update(doc);

        updateTaskProgress(task, TranslationStatus.PROCESSING, 10, null);

        executeDocumentTranslation(task, doc);
    }

    /**
     * 异步执行文档翻译(使用虚拟线程)
     * 接入三级缓存 + RAG + 实体一致性
     */
    private void executeDocumentTranslation(TranslationTask task, Document doc) {
        Thread.startVirtualThread(() -> {
            try {
                // 读取文件内容
                String content = readDocumentContent(doc.getPath(), doc.getFileType());
                if (content == null || content.trim().isEmpty()) {
                    updateTaskProgress(task, TranslationStatus.FAILED, 0, "文件内容为空");
                    return;
                }

                // 按段落分割翻译(每 3000 字符一批)
                updateTaskProgress(task, TranslationStatus.PROCESSING, 20, null);
                StringBuilder translatedContent = new StringBuilder();
                String[] paragraphs = content.split("(?<=\n)");
                int total = paragraphs.length;
                int batchStart = 0;
                String targetLang = task.getTargetLang();
                String engine = task.getEngine();
                Long userId = task.getUserId();
                String docId = "doc_" + doc.getId();

                // 加载用户术语表
                List<Glossary> glossaryTerms = loadGlossaryTermsForUser(userId, content);

                TranslationPipeline pipeline = new TranslationPipeline(
                    cachePort, ragTranslationService, entityConsistencyService,
                    translationClientPort, postProcessingService, null, userId, docId, glossaryTerms);

                while (batchStart < total) {
                    StringBuilder batch = new StringBuilder();
                    int batchEnd = batchStart;
                    while (batchEnd < total && batch.length() + paragraphs[batchEnd].length() <= 1500) {
                        batch.append(paragraphs[batchEnd]);
                        batchEnd++;
                    }

                    if (batch.length() == 0) {
                        batch.append(paragraphs[batchStart]);
                        batchEnd = batchStart + 1;
                    }

                    String batchText = batch.toString();
                    String translated;
                    try {
                        if ("fast".equals(doc.getMode())) {
                            translated = pipeline.executeFast(batchText, targetLang, TranslationMode.FAST);
                        } else {
                            translated = pipeline.execute(batchText, targetLang, TranslationMode.EXPERT);
                        }
                    } catch (Exception e) {
                        log.warn("翻译批次失败 [{}/{}],保留原文: {}", batchStart, total, e.getMessage());
                        translated = batchText; // fallback to original text
                    }
                    if (translated != null && !translated.isEmpty()) {
                        // 补回原文批次中的换行符以保持格式对齐
                        if (!translated.endsWith("\n") && !translated.endsWith("\r")) {
                            String trailingNewline = getTrailingNewline(batchText);
                            if (!trailingNewline.isEmpty()) {
                                translated += trailingNewline;
                            }
                        }
                        translatedContent.append(translated);
                    }
                    batchStart = batchEnd;

                    int progress = 20 + (int) ((batchStart * 80.0) / total);
                    updateTaskProgress(task, TranslationStatus.PROCESSING, progress, null);
                }

                // 保存翻译结果到文件
                String translatedPath = ExternalResponseUtil.buildTranslatedPath(doc.getPath());
                Files.write(Paths.get(translatedPath), translatedContent.toString().getBytes(java.nio.charset.StandardCharsets.UTF_8));

                // 更新文档状态为已完成
                doc.setStatus(TranslationStatus.COMPLETED.getValue());
                doc.setCompletedTime(LocalDateTime.now());
                doc.setUpdateTime(LocalDateTime.now());
                documentPort.update(doc);

                updateTaskProgress(task, TranslationStatus.COMPLETED, 100, null);
                saveTranslationHistory(task, content, translatedContent.toString());

            } catch (Exception e) {
                log.error("文档翻译失败: {}", e.getMessage(), e);
                updateTaskProgress(task, TranslationStatus.FAILED, 0, "翻译失败:" + e.getMessage());
            }
        });
    }

    /**
     * 读取文档内容
     */
    private String readDocumentContent(String filePath, String fileType) throws IOException {
        Path path = Paths.get(filePath);
        if (!Files.exists(path)) {
            throw new IOException("文件不存在: " + filePath);
        }

        if ("txt".equals(fileType)) {
            return Files.readString(path, java.nio.charset.StandardCharsets.UTF_8);
        } else {
            // DOCX/EPUB/PDF 暂不支持,返回原始文本
            throw new IOException("暂不支持 " + fileType.toUpperCase() + " 格式,仅支持 TXT 文件");
        }
    }

    @Transactional
    protected void updateTaskProgress(TranslationTask task, TranslationStatus status, int progress, String errorMessage) {
        // 如果任务已被取消,不再更新
        TranslationTask current = translationPort.findTaskByTaskId(task.getTaskId()).orElse(null);
        if (current != null && TranslationStatus.FAILED.getValue().equals(current.getStatus()) && "用户取消任务".equals(current.getErrorMessage())) {
            return;
        }
        task.setStatus(status.getValue());
        task.setProgress(progress);
        if (status == TranslationStatus.COMPLETED) {
            task.setCompletedTime(LocalDateTime.now());
        }
        if (errorMessage != null) {
            task.setErrorMessage(errorMessage);
        }
        translationPort.updateTask(task);
    }

    /**
     * 保存翻译历史
     */
    private void saveTranslationHistory(TranslationTask task, String sourceText, String targetText) {
        TranslationHistory history = new TranslationHistory();
        history.setUserId(task.getUserId());
        history.setTaskId(task.getTaskId());
        history.setType(task.getType());
        history.setDocumentId(task.getDocumentId());
        history.setSourceLang(task.getSourceLang());
        history.setTargetLang(task.getTargetLang());
        history.setSourceText(sourceText != null && sourceText.length() > 500
                ? sourceText.substring(0, 500)
                : sourceText);

        // 解析翻译响应,提取实际翻译内容
        String translatedContent = ExternalResponseUtil.extractDataField(targetText);
        history.setTargetText(translatedContent != null && translatedContent.length() > 500
                ? translatedContent.substring(0, 500)
                : translatedContent);
        history.setEngine(task.getEngine());
        history.setCreateTime(LocalDateTime.now());

        translationPort.saveHistory(history);
    }

    /**
     * 加载用户术语表中在原文中出现的词条
     */
    private List<Glossary> loadGlossaryTermsForUser(Long userId, String sourceText) {
        try {
            List<Glossary> allTerms = glossaryPort.findActiveGlossaryByUserId(userId);

            // 过滤只保留在原文中出现的术语
            return allTerms.stream()
                    .filter(term -> sourceText.contains(term.getSourceWord()))
                    .toList();
        } catch (Exception e) {
            log.warn("加载术语表失败: {}", e.getMessage());
            return List.of();
        }
    }

    /**
     * 根据任务 ID 获取任务
     */
    public TranslationTask getTaskByTaskId(String taskId) {
        return translationPort.findTaskByTaskId(taskId).orElse(null);
    }

    /**
     * 根据文档 ID 获取任务
     */
    public TranslationTask getTaskByDocumentId(Long docId) {
        return translationPort.findTaskByDocumentId(docId).orElse(null);
    }

    /**
     * 取消翻译任务
     */
    public boolean cancelTask(String taskId, Long userId) {
        TranslationTask task = translationPort.findTaskByTaskId(taskId).orElse(null);
        if (task != null && task.getUserId().equals(userId)) {
            if (TranslationStatus.PENDING.getValue().equals(task.getStatus()) || TranslationStatus.PROCESSING.getValue().equals(task.getStatus())) {
                task.setStatus(TranslationStatus.FAILED.getValue());
                task.setErrorMessage("用户取消任务");
                translationPort.updateTask(task);
                // 同步更新 Document 表状态
                if (task.getDocumentId() != null) {
                    Document doc = documentPort.findById(task.getDocumentId()).orElse(null);
                    if (doc != null) {
                        doc.setStatus(TranslationStatus.FAILED.getValue());
                        doc.setErrorMessage("用户取消任务");
                        doc.setUpdateTime(LocalDateTime.now());
                        documentPort.update(doc);
                    }
                }
                return true;
            }
        }
        return false;
    }

    /**
     * 删除翻译历史记录(逻辑删除)
     */
    public boolean deleteHistory(String taskId, Long userId) {
        TranslationHistory history = translationPort.findHistoryByTaskId(taskId).orElse(null);
        if (history != null && history.getUserId().equals(userId)) {
            translationPort.deleteHistory(history.getId());
            return true;
        }
        return false;
    }

    /**
     * 获取翻译结果
     */
    public TranslationResultResponse getTranslationResult(String taskId) {
        TranslationTask task = translationPort.findTaskByTaskId(taskId).orElse(null);
        if (task == null) {
            return null;
        }

        TranslationResultResponse response = new TranslationResultResponse();
        response.setTaskId(taskId);
        response.setStatus(task.getStatus());
        response.setSourceLang(task.getSourceLang());
        response.setTargetLang(task.getTargetLang());

        String status = task.getStatus();
        if (TranslationStatus.COMPLETED.getValue().equals(status) || TranslationStatus.PROCESSING.getValue().equals(status) || TranslationStatus.PENDING.getValue().equals(status) || TranslationStatus.FAILED.getValue().equals(status)) {
            if (TranslationStatus.COMPLETED.getValue().equals(status)) {
                response.setCompletedTime(task.getCompletedTime() != null
                        ? task.getCompletedTime().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)
                        : null);
            }

            // 如果是文本翻译,从历史中获取结果
            if ("text".equals(task.getType())) {
                TranslationHistory history = translationPort.findHistoryByTaskId(task.getTaskId()).orElse(null);
                if (history != null) {
                    response.setTranslatedText(history.getTargetText());
                    response.setSourceContent(history.getSourceText());
                }
            } else if ("document".equals(task.getType())) {
                // 文档翻译,读取文件内容
                if (task.getDocumentId() != null) {
                    Document doc = documentPort.findById(task.getDocumentId()).orElse(null);
                    if (doc != null) {
                        response.setTranslatedFilePath(doc.getPath());
                        // 读取原文内容(任何状态都尝试读取)
                        try {
                            Path path = Paths.get(doc.getPath());
                            if (Files.exists(path)) {
                                String content = Files.readString(path, java.nio.charset.StandardCharsets.UTF_8);
                                response.setSourceContent(content);
                            }
                        } catch (Exception e) {
                            log.warn("读取原文文件失败: {}", e.getMessage());
                        }
                        // 已完成的文档翻译,读取翻译文件内容
                        if (TranslationStatus.COMPLETED.getValue().equals(status)) {
                            String translatedPath = ExternalResponseUtil.buildTranslatedPath(doc.getPath());
                            try {
                                Path path = Paths.get(translatedPath);
                                if (Files.exists(path)) {
                                    // 文件内容已经是纯文本翻译结果,不要再次调用 extractTranslatedContent
                                    response.setTranslatedText(Files.readString(path, java.nio.charset.StandardCharsets.UTF_8));
                                }
                            } catch (Exception e) {
                                log.warn("读取翻译文件失败: {}", e.getMessage());
                            }
                        }
                    }
                }
            }
        }

        return response;
    }

    /**
     * 下载翻译结果(返回文件路径)
     */
    public String getDownloadPath(String taskId, Long userId) {
        TranslationTask task = translationPort.findTaskByTaskId(taskId).orElse(null);
        if (task == null || !task.getUserId().equals(userId)) {
            return null;
        }

        if (TranslationStatus.COMPLETED.getValue().equals(task.getStatus()) && "document".equals(task.getType())) {
            if (task.getDocumentId() != null) {
                Document doc = documentPort.findById(task.getDocumentId()).orElse(null);
                if (doc != null && Files.exists(Paths.get(doc.getPath()))) {
                    return doc.getPath();
                }
            }
        }

        return null;
    }

    /**
     * 获取翻译历史列表(包含进行中的任务)
     */
    public List<TranslationHistory> getTranslationHistory(Long userId, int page, int pageSize, String type) {
        int offset = (page - 1) * pageSize;

        // 查询进行中的任务(pending/processing),这些任务可能还没有历史记录
        List<TranslationHistory> histories = new ArrayList<>();

        // 先查询进行中的任务
        List<TranslationTask> inProgressTasks = translationPort.findTasksByUserIdAndStatus(userId, offset, pageSize);
        for (TranslationTask task : inProgressTasks) {
            TranslationHistory history = new TranslationHistory();
            history.setUserId(task.getUserId());
            history.setTaskId(task.getTaskId());
            history.setType(task.getType());
            history.setDocumentId(task.getDocumentId());
            history.setSourceLang(task.getSourceLang());
            history.setTargetLang(task.getTargetLang());
            history.setSourceText(null);
            history.setTargetText(null);
            history.setEngine(task.getEngine());
            history.setCreateTime(task.getCreateTime());
            histories.add(history);
        }

        // 再查询已完成的历史记录
        List<TranslationHistory> completedHistories = translationPort.findHistoryByUserId(userId, offset, pageSize);
        histories.addAll(completedHistories);

        // 去重(按taskId)
        histories = histories.stream()
                .collect(java.util.stream.Collectors.toMap(
                        TranslationHistory::getTaskId,
                        h -> h,
                        (existing, replacement) -> existing
                ))
                .values()
                .stream()
                .sorted((a, b) -> {
                    if (a.getCreateTime() == null) return 1;
                    if (b.getCreateTime() == null) return -1;
                    return b.getCreateTime().compareTo(a.getCreateTime());
                })
                .limit(pageSize)
                .toList();

        if (type != null && !"all".equals(type)) {
            return histories.stream()
                    .filter(h -> type.equals(h.getType()))
                    .toList();
        }

        return histories;
    }

    /**
     * 统计翻译历史总数(按类型过滤)
     */
    @Override
    public int countTranslationHistory(Long userId, String type) {
        if ("all".equals(type)) {
            return translationPort.countHistoryByUserId(userId);
        }
        return translationPort.countHistoryByUserIdAndType(userId, type);
    }

    /**
     * 生成任务 ID
     */
    private String generateTaskId() {
        return "task_" + UUID.randomUUID().toString().replace("-", "").substring(0, 16);
    }

    /**
     * 转换为 TaskStatusResponse
     */
    public TaskStatusResponse toTaskStatusResponse(TranslationTask task) {
        if (task == null) {
            return null;
        }

        TaskStatusResponse response = new TaskStatusResponse();
        response.setTaskId(task.getTaskId());
        response.setType(task.getType());
        response.setStatus(task.getStatus());
        response.setProgress(task.getProgress());
        response.setSourceLang(task.getSourceLang());
        response.setTargetLang(task.getTargetLang());
        response.setCreateTime(task.getCreateTime() != null
                ? task.getCreateTime().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)
                : null);
        response.setCompletedTime(task.getCompletedTime() != null
                ? task.getCompletedTime().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)
                : null);
        response.setErrorMessage(task.getErrorMessage());

        return response;
    }

    /**
     * 转换为 TranslationHistoryResponse
     */
    public TranslationHistoryResponse toHistoryResponse(TranslationHistory history) {
        if (history == null) {
            return null;
        }

        TranslationHistoryResponse response = new TranslationHistoryResponse();
        response.setId(history.getId());
        response.setTaskId(history.getTaskId());
        response.setType(history.getType());
        response.setSourceLang(history.getSourceLang());
        response.setTargetLang(history.getTargetLang());
        response.setSourceTextPreview(history.getSourceText() != null
                ? history.getSourceText().substring(0, Math.min(100, history.getSourceText().length()))
                : null);
        response.setTargetTextPreview(history.getTargetText() != null
                ? history.getTargetText().substring(0, Math.min(100, history.getTargetText().length()))
                : null);
        response.setCreateTime(history.getCreateTime() != null
                ? history.getCreateTime().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)
                : null);

        // 查询关联任务状态
        if (history.getTaskId() != null) {
            TranslationTask task = translationPort.findTaskByTaskId(history.getTaskId()).orElse(null);
            response.setStatus(task != null ? task.getStatus() : TranslationStatus.COMPLETED.getValue());
        } else {
            response.setStatus(TranslationStatus.COMPLETED.getValue());
        }

        // 查询关联文档名称
        if (history.getDocumentId() != null) {
            Document doc = documentPort.findById(history.getDocumentId()).orElse(null);
            if (doc != null) {
                response.setDocumentName(doc.getName());
            }
        }

        // fallback:如果documentName为空,尝试从任务关联的文档获取
        if (response.getDocumentName() == null && history.getTaskId() != null) {
            TranslationTask task = translationPort.findTaskByTaskId(history.getTaskId()).orElse(null);
            if (task != null && task.getDocumentId() != null) {
                Document doc = documentPort.findById(task.getDocumentId()).orElse(null);
                if (doc != null) {
                    response.setDocumentName(doc.getName());
                }
            }
        }

        // 最终fallback:使用任务类型作为名称
        if (response.getDocumentName() == null) {
            response.setDocumentName("document".equals(history.getType()) ? "文档翻译" : "文本翻译");
        }

        return response;
    }

    /**
     * SSE 流式文档翻译(基于已上传文档)
     * 读取磁盘文件 → 分段翻译 → 逐段推送 SSE → 更新文档状态
     */
    public SseEmitter streamTranslateDocumentById(Long docId, String targetLang, String mode) {
        SseEmitter emitter = SseEmitterUtil.createSseEmitter(300_000L);
        ObjectMapper mapper = new ObjectMapper();

        Thread.startVirtualThread(() -> {
            try {
                Document doc = documentPort.findById(docId).orElse(null);
                if (doc == null) {
                    SseEmitterUtil.sendError(emitter, "文档不存在");
                    SseEmitterUtil.complete(emitter);
                    return;
                }

                // 创建翻译任务
                TranslationTask task = createDocumentTask(doc.getUserId(), doc);
                doc.setStatus(TranslationStatus.PROCESSING.getValue());
                doc.setUpdateTime(LocalDateTime.now());
                documentPort.update(doc);
                updateTaskProgress(task, TranslationStatus.PROCESSING, 10, null);

                // 读取文件内容
                String content = readDocumentContent(doc.getPath(), doc.getFileType());
                if (content == null || content.trim().isEmpty()) {
                    updateTaskProgress(task, TranslationStatus.FAILED, 0, "文件内容为空");
                    SseEmitterUtil.sendError(emitter, "文件内容为空");
                    SseEmitterUtil.complete(emitter);
                    return;
                }

                // 按段落分割翻译
                String[] paragraphs = content.split("(?<=\n)");
                int total = paragraphs.length;
                StringBuilder translatedContent = new StringBuilder();

                for (int i = 0; i < total; i++) {
                    // 检查任务是否已被取消
                    TranslationTask currentTask = translationPort.findTaskByTaskId(task.getTaskId()).orElse(null);
                    if (currentTask != null && TranslationStatus.FAILED.getValue().equals(currentTask.getStatus())) {
                        log.info("翻译任务已被取消,提前退出 [taskId={}]", task.getTaskId());
                        SseEmitterUtil.complete(emitter);
                        return;
                    }

                    String paragraph = paragraphs[i];
                    if (paragraph.trim().isEmpty()) {
                        translatedContent.append(paragraph);
                        continue;
                    }

                    String textId = "seg_" + i;
                    try {
                        String result;
                        if ("expert".equals(mode)) {
                            TranslationPipeline pipeline = new TranslationPipeline(
                                    cachePort, ragTranslationService, entityConsistencyService,
                                    translationClientPort, postProcessingService, null, null);
                            result = pipeline.execute(paragraph, targetLang, TranslationMode.EXPERT);
                        } else {
                            result = translationClientPort.translate(
                                    paragraph, targetLang, "google", false, true, List.of(), null, null);
                        }
                        String translation = ExternalResponseUtil.extractDataField(result);
                        if (translation == null || translation.isEmpty()) {
                            log.warn("翻译结果为空,保留原文");
                            translation = paragraph;
                        }
                        // 补回原文中的换行符以保持格式对齐
                        if (!translation.endsWith("\n") && !translation.endsWith("\r")) {
                            if (paragraph.endsWith("\r\n")) {
                                translation += "\r\n";
                            } else if (paragraph.endsWith("\n")) {
                                translation += "\n";
                            } else if (paragraph.endsWith("\r")) {
                                translation += "\r";
                            }
                        }
                        translatedContent.append(translation);

                        Map<String, Object> eventData = new HashMap<>();
                        eventData.put("textId", textId);
                        eventData.put("original", paragraph);
                        eventData.put("translation", translation);
                        eventData.put("progress", (int) (((i + 1) * 100.0) / total));

                        SseEmitterUtil.sendData(emitter, mapper.writeValueAsString(eventData));
                    } catch (Exception e) {
                        log.warn("段落翻译失败 [{}]: {}", i, e.getMessage());
                        translatedContent.append(paragraph);
                        Map<String, Object> errorData = new HashMap<>();
                        errorData.put("textId", textId);
                        errorData.put("original", paragraph);
                        errorData.put("translation", paragraph);
                        errorData.put("error", true);
                        errorData.put("progress", (int) (((i + 1) * 100.0) / total));
                        SseEmitterUtil.sendData(emitter, mapper.writeValueAsString(errorData));
                    }

                    int progress = 20 + (int) (((i + 1) * 80.0) / total);
                    updateTaskProgress(task, TranslationStatus.PROCESSING, progress, null);
                }

                // 保存翻译结果到文件
                String translatedPath = ExternalResponseUtil.buildTranslatedPath(doc.getPath());
                Files.write(Paths.get(translatedPath), translatedContent.toString().getBytes(java.nio.charset.StandardCharsets.UTF_8));

                // 更新文档和任务状态
                doc.setStatus(TranslationStatus.COMPLETED.getValue());
                doc.setCompletedTime(LocalDateTime.now());
                doc.setUpdateTime(LocalDateTime.now());
                documentPort.update(doc);

                updateTaskProgress(task, TranslationStatus.COMPLETED, 100, null);
                saveTranslationHistory(task, content, translatedContent.toString());

                SseEmitterUtil.sendDone(emitter);
                SseEmitterUtil.complete(emitter);

            } catch (Exception e) {
                log.error("流式文档翻译失败: {}", e.getMessage(), e);
                SseEmitterUtil.sendError(emitter, "翻译失败:" + e.getMessage());
                SseEmitterUtil.complete(emitter);
            }
        });

        return emitter;
    }

    /**
     * SSE 流式文档翻译
     * 读取文件 → 分段翻译 → 逐段推送 SSE
     */
    public SseEmitter streamTranslateDocument(MultipartFile file, String sourceLang, String targetLang, String mode) {
        SseEmitter emitter = SseEmitterUtil.createSseEmitter(300_000L);
        ObjectMapper mapper = new ObjectMapper();

        Thread.startVirtualThread(() -> {
            try {
                String fileName = file.getOriginalFilename();
                String fileType = fileName != null ? fileName.substring(fileName.lastIndexOf('.') + 1).toLowerCase() : "";
                String content = readMultipartFileContent(file, fileType);
                if (content == null || content.trim().isEmpty()) {
                    SseEmitterUtil.sendError(emitter, "文件内容为空");
                    SseEmitterUtil.complete(emitter);
                    return;
                }

                String[] paragraphs = content.split("(?<=\n)");
                int total = paragraphs.length;
                int translated = 0;

                for (int i = 0; i < total; i++) {
                    String paragraph = paragraphs[i];
                    if (paragraph.trim().isEmpty()) {
                        translated++;
                        continue;
                    }

                    String textId = "seg_" + i;
                    try {
                        String result;
                        if ("expert".equals(mode)) {
                            TranslationPipeline pipeline = new TranslationPipeline(
                                    cachePort, ragTranslationService, entityConsistencyService,
                                    translationClientPort, postProcessingService, null, null);
                            result = pipeline.execute(paragraph, targetLang, TranslationMode.EXPERT);
                        } else {
                            result = translationClientPort.translate(
                                    paragraph, targetLang, "google", false, true, List.of(), null, null);
                        }

                        // 提取实际翻译内容
                        String translation = ExternalResponseUtil.extractDataField(result);
                        if (translation == null || translation.isEmpty()) {
                            log.warn("翻译结果为空,保留原文");
                            translation = paragraph;
                        }
                        // 补回原文中的换行符以保持格式对齐
                        if (!translation.endsWith("\n") && !translation.endsWith("\r")) {
                            if (paragraph.endsWith("\r\n")) {
                                translation += "\r\n";
                            } else if (paragraph.endsWith("\n")) {
                                translation += "\n";
                            } else if (paragraph.endsWith("\r")) {
                                translation += "\r";
                            }
                        }

                        Map<String, Object> eventData = new HashMap<>();
                        eventData.put("textId", textId);
                        eventData.put("original", paragraph);
                        eventData.put("translation", translation);
                        eventData.put("progress", (int) (((i + 1) * 100.0) / total));

                        SseEmitterUtil.sendData(emitter, mapper.writeValueAsString(eventData));
                    } catch (Exception e) {
                        log.warn("段落翻译失败 [{}]: {}", i, e.getMessage());
                        Map<String, Object> errorData = new HashMap<>();
                        errorData.put("textId", textId);
                        errorData.put("original", paragraph);
                        errorData.put("translation", paragraph);
                        errorData.put("error", true);
                        errorData.put("progress", (int) (((i + 1) * 100.0) / total));
                        SseEmitterUtil.sendData(emitter, mapper.writeValueAsString(errorData));
                    }

                    translated++;
                }

                SseEmitterUtil.sendDone(emitter);
                SseEmitterUtil.complete(emitter);

            } catch (Exception e) {
                log.error("流式文档翻译失败: {}", e.getMessage(), e);
                SseEmitterUtil.sendError(emitter, "翻译失败:" + e.getMessage());
                SseEmitterUtil.complete(emitter);
            }
        });

        return emitter;
    }

    /**
     * 读取 MultipartFile 内容
     */
    private String readMultipartFileContent(MultipartFile file, String fileType) throws IOException {
        byte[] bytes = file.getBytes();
        String content = new String(bytes, java.nio.charset.StandardCharsets.UTF_8);

        if ("txt".equals(fileType)) {
            return content;
        } else {
            throw new IOException("暂不支持 " + fileType.toUpperCase() + " 格式,仅支持 TXT 文件");
        }
    }

    /**
     * 提取字符串末尾的所有换行符
     * 用于在翻译后恢复原文的格式
     */
    private static String getTrailingNewline(String text) {
        if (text == null || text.isEmpty()) {
            return "";
        }
        int end = text.length();
        while (end > 0) {
            char c = text.charAt(end - 1);
            if (c != '\n' && c != '\r') {
                break;
            }
            end--;
        }
        return text.substring(end);
    }

    /**
     * 定时清理卡死的任务(每 5 分钟执行一次)
     * 将超过 30 分钟仍处于 PENDING/PROCESSING 状态的任务标记为 FAILED
     */
    @Scheduled(fixedRate = 300000)
    public void cleanupStuckTasks() {
        LocalDateTime cutoff = LocalDateTime.now().minusMinutes(30);
        List<TranslationTask> stuckTasks = translationPort.findTasksByStatusAndCreateTimeBefore(
                TranslationStatus.PROCESSING.getValue(), cutoff);
        for (TranslationTask task : stuckTasks) {
            log.warn("清理卡死任务: taskId={}, createTime={}", task.getTaskId(), task.getCreateTime());
            task.setStatus(TranslationStatus.FAILED.getValue());
            task.setErrorMessage("任务超时,自动标记为失败");
            translationPort.updateTask(task);
            // 同步更新文档状态
            if (task.getDocumentId() != null) {
                Document doc = documentPort.findById(task.getDocumentId()).orElse(null);
                if (doc != null && TranslationStatus.PROCESSING.getValue().equals(doc.getStatus())) {
                    doc.setStatus(TranslationStatus.FAILED.getValue());
                    doc.setErrorMessage("任务超时,自动标记为失败");
                    doc.setUpdateTime(LocalDateTime.now());
                    documentPort.update(doc);
                }
            }
        }
    }

    /**
     * 应用关闭时优雅关闭异步翻译线程
     */
    @PreDestroy
    public void shutdown() {
        log.info("TranslationTaskApplicationService 关闭中,等待异步翻译任务完成...");
        // 虚拟线程由 JVM 自动管理,无需显式关闭
        log.info("TranslationTaskApplicationService 已关闭");
    }
}