异步上传石墨文件进度条前端展示记录(采用Redis中List数据结构实现)🔥

5/10/2025 异步任务

上篇文章说到,之前使用Redis的String数据结构进行存储异步上传石墨文档的任务状态,做法有些性能上的问题。

下面简单列举一下采用String数据结构进行存储的劣势:

  1. 缺少历史记录:无法追踪任务执行的完整过程、只能获取最新状态,丢失中间状态信息

  2. 并发处理:在高并发场景下需要额外考虑乐观锁等机制避免数据覆盖、需要使用WATCH命令或Lua脚本确保原子性

  3. 功能局限:不支持队列操作,无法实现基于队列的分布式处理、不适合需要按顺序处理的场景

# 采用Redis的LIst数据结构或者String数据结构如何选择?

# 适合使用List数据结构

  • 需要完整记录任务执行历史
  • 需要按时间顺序查看任务状态变化
  • 任务执行次数有限,存储空间不是主要考虑因素
  • 需要支持分布式任务处理

# 适合使用String数据结构

  • 任务更新频繁,存储空间是关键考虑因素

  • 系统并发量大,需要最高的读写性能

  • 只关注任务的最新状态

  • 任务状态简单,不需要复杂的历史记录

# 进度条实现逻辑简图

下图简单说明了进度条大致的逻辑,进度条的更新进度和具体业务的步骤进行绑定,当然下图是主流程简化版本。

# 完整流程逻辑图

# 如何使用Redis的List结构进行操作

创建一个操作Redis的工具类,需要在工具类中定义于业务相关的属性字段信息,定义多个构造方法进行存储需要更新字段信息。很关键需要直接使用对象进行直接存储,避免采用JSON格式化方式,JSON格式化读-修改-写问题:当多个线程同时读取、修改和写入同一JSON时,可能导致数据不一。部分更新问题:当只需更新对象的部分字段时,使用JSON需要先读取整个对象,再修改,再写回。

# 利用Redis的List数据结构存储

    /**
     * 将任务状态添加到Redis列表中
     * @param redisTemplate Redis模板
     */
    public void addTaskToList(RedisTemplate<String, Object> redisTemplate) {
        String taskKey = this.findTaskCacheKey();
        redisTemplate.opsForList().leftPush(taskKey, this);
        redisTemplate.expire(taskKey, 1, TimeUnit.DAYS);
    }



    /**
     * 更新任务进度
     * @param status 状态
     * @param msg 消息
     * @param addPercent 增加的进度百分比
     * @param redisTemplate Redis模板(使用这个进行存储调用)
     */
    public void commonUpdate(String status, String msg, Integer addPercent, RedisTemplate<String, Object> redisTemplate) {
        this.commonUpdate(status, msg, addPercent, null, redisTemplate);
    }


    /**
         * 从Redis中清理任务进度记录
         * @param taskId 任务ID
         * @param userCode 用户编码
         * @param targetStatus 目标状态(SUCCESS或FAILED)- 只保留这个状态的记录,若为null则删除所有记录
         */
    private void clearTaskProgressRecords(String taskId, String userCode, String targetStatus) {
        try {
            // 获取用户任务列表的键
            String userTasksKey = String.format("%s:%s", TASK_PROCESS_PREFIX_KEY, userCode);

            // 获取当前任务列表
            List<Object> tasksList = redisTemplate.opsForList().range(userTasksKey, 0, -1);
            if (tasksList != null && !tasksList.isEmpty()) {
                // 收集需要删除的元素和需要保留的元素
                List<Object> toRemove = new ArrayList<>();
                Object targetRecord = null;

                for (Object taskObj : tasksList) {
                    try {
                        // 检查对象类型
                        if (taskObj instanceof xxxx) {
                            LongTaskProcessResponse task = (xxx) taskObj;
                            String currentTaskId = task.getTaskId();
                            String status = task.getStatus();

                            // 如果找到匹配的任务ID
                            if (taskId.equals(currentTaskId)) {
                                // 如果指定了目标状态,检查是否匹配
                                if (targetStatus != null && targetStatus.equals(status)) {
                                    // 保留目标状态的记录
                                    targetRecord = taskObj;
                                } else {
                                    // 删除非目标状态的记录
                                    toRemove.add(taskObj);
                                }
                            }
                        } else {
                            log.warn("任务对象类型不正确,无法处理:{}",
                                    taskObj != null ? taskObj.getClass().getName() : "null");
                        }
                    } catch (Exception e) {
                        log.warn("处理任务对象失败: {}", e.getMessage());
                    }
                }

                // 删除收集到的所有元素
                for (Object obj : toRemove) {
                    redisTemplate.opsForList().remove(userTasksKey, 0, obj);
                }

                // 如果目标记录存在,确保它在列表的最前面(最新)
                if (targetRecord != null) {
                    // 先删除,再添加到列表头部,确保是最新的记录
                    redisTemplate.opsForList().remove(userTasksKey, 0, targetRecord);
                    redisTemplate.opsForList().leftPush(userTasksKey, targetRecord);
                }

                if (!toRemove.isEmpty()) {
                    log.info("从Redis中清理任务进度记录,userCode: {}, taskId: {}, 删除记录数: {}, 保留状态: {}",
                            userCode, taskId, toRemove.size(), targetStatus);
                }
            }
        } catch (Exception e) {
            log.error("从Redis中清理任务进度记录失败,taskId: {}, userCode: {}", taskId, userCode, e);
        }
    }

# 利用Java特性进行存储Redis

  • this自动引用的就是调用该方法的failedResponse对象
  • 方法中的this不需要显式传递,它是Java方法调用机制自动提供的
  • 当执行leftPush(taskKey, this)时,传入Redis的就是整个failedResponse对象
                // 创建一个纯粹的失败状态记录
                LongTaskProcessResponse failedResponse = new LongTaskProcessResponse();
                failedResponse.setTaskId(subTaskResponse.getTaskId());
                failedResponse.setBusinessType(subTaskResponse.getBusinessType());
                failedResponse.setStatus(KbProcessStatus.FAILED.name());
                failedResponse.setMsg("处理失败: " + e.getMessage());
                failedResponse.setProcessPercent(new BigDecimal(0));
                failedResponse.setUserCode(currentUser.getCode());
                failedResponse.setTitle(subTaskResponse.getTitle());
                failedResponse.setCreateTime(System.currentTimeMillis());
                failedResponse.setExtraData(subTaskResponse.getExtraData());

                // 添加失败记录
                failedResponse.addTaskToList(redisTemplate);

    /**
     * 将任务状态添加到Redis列表中
     * @param redisTemplate Redis模板
     */
    public void addTaskToList(RedisTemplate<String, Object> redisTemplate) {
        String taskKey = this.findTaskCacheKey();
        redisTemplate.opsForList().leftPush(taskKey, this);
        redisTemplate.expire(taskKey, 1, TimeUnit.DAYS);
    }
    我很快乐-周兴哲
    致逝去的青春