diff --git a/ruoyi-bussiness/src/main/java/com/ruoyi/cms/service/impl/ESJobSearchImpl.java b/ruoyi-bussiness/src/main/java/com/ruoyi/cms/service/impl/ESJobSearchImpl.java index 5fef9b7..15ba33d 100644 --- a/ruoyi-bussiness/src/main/java/com/ruoyi/cms/service/impl/ESJobSearchImpl.java +++ b/ruoyi-bussiness/src/main/java/com/ruoyi/cms/service/impl/ESJobSearchImpl.java @@ -13,6 +13,7 @@ import com.ruoyi.cms.service.IESJobSearchService; import com.ruoyi.cms.util.ListUtil; import com.ruoyi.cms.util.StringUtil; import com.ruoyi.common.core.domain.entity.Company; +import com.ruoyi.common.core.redis.RedisCache; import com.ruoyi.common.core.text.Convert; import com.ruoyi.common.utils.SiteSecurityUtils; import com.ruoyi.common.utils.bean.BeanUtils; @@ -28,6 +29,7 @@ import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.util.*; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; /** @@ -48,6 +50,16 @@ public class ESJobSearchImpl implements IESJobSearchService private AppUserServiceImpl appUserService; @Autowired private ICompanyService iCompanyService; + @Autowired + private RedisCache redisCache; + + // 锁的key(唯一标识ES索引初始化) + private static final String ES_INIT_LOCK_KEY = "es:job_document:init:lock"; + // 锁过期时间(30分钟,确保初始化完成) + private static final Integer LOCK_EXPIRE_SECONDS = 1800; + // 等待锁时间(5分钟,避免无限等待) + private static final Integer WAIT_LOCK_SECONDS = 300; + @Autowired private BussinessDictDataServiceImpl bussinessDictDataServicel; @@ -58,7 +70,52 @@ public class ESJobSearchImpl implements IESJobSearchService @PostConstruct public void init() { - resetTextCache(); + boolean isLockAcquired = false; + try { + isLockAcquired = acquireDistributedLock(); + if (isLockAcquired) { + resetTextCache(); + } else { + logger.info("其他节点正在初始化ES索引,直接复用,无需重复执行"); + } + } catch (InterruptedException e) { + logger.error("ES索引初始化等待锁异常", e); + Thread.currentThread().interrupt(); + } finally { + if (isLockAcquired) { + releaseDistributedLock(); + } + } + } + + /** + * 基于自定义RedisCache实现分布式锁(无getRedisTemplate适配版) + */ + private boolean acquireDistributedLock() throws InterruptedException { + long start = System.currentTimeMillis(); + while (System.currentTimeMillis() - start < WAIT_LOCK_SECONDS * 1000) { + boolean lockExists = redisCache.hasKey(ES_INIT_LOCK_KEY); + if (!lockExists) { + redisCache.setCacheObject(ES_INIT_LOCK_KEY, "es_init_locked", LOCK_EXPIRE_SECONDS, TimeUnit.SECONDS); + logger.info("成功获取ES初始化分布式锁,key:{}", ES_INIT_LOCK_KEY); + return true; + } + TimeUnit.MILLISECONDS.sleep(500); + } + logger.warn("等待{}秒未获取到ES初始化锁,放弃执行", WAIT_LOCK_SECONDS); + return false; + } + + /** + * 释放分布式锁(用自定义RedisCache的deleteObject方法) + */ + private void releaseDistributedLock() { + try { + redisCache.deleteObject(ES_INIT_LOCK_KEY); + logger.info("已释放ES初始化分布式锁,key:{}", ES_INIT_LOCK_KEY); + } catch (Exception e) { + logger.error("释放ES初始化锁异常", e); + } } @Override @@ -79,8 +136,24 @@ public class ESJobSearchImpl implements IESJobSearchService public void resetTextCache() { logger.info("正在重新刷新es"); // 删除并重新创建索引 - esJobDocumentMapper.deleteIndex("job_document"); - esJobDocumentMapper.createIndex(); + /*esJobDocumentMapper.deleteIndex("job_document"); + esJobDocumentMapper.createIndex();*/ + if (esJobDocumentMapper.existsIndex("job_document")) { + esJobDocumentMapper.deleteIndex("job_document"); + logger.info("已删除原有job_document索引"); + try { + TimeUnit.MILLISECONDS.sleep(500); + } catch (InterruptedException e) { + logger.error("删除索引后休眠异常", e); + } + } + + if (!esJobDocumentMapper.existsIndex("job_document")) { + esJobDocumentMapper.createIndex(); + logger.info("已创建job_document索引"); + } else { + logger.info("索引已被其他节点创建,直接复用"); + } // 分批次处理数据 int batchSize = 1000; // 每批次处理的数据量