修改金蝶集群启动es
This commit is contained in:
@@ -13,6 +13,7 @@ import com.ruoyi.cms.service.IESJobSearchService;
|
|||||||
import com.ruoyi.cms.util.ListUtil;
|
import com.ruoyi.cms.util.ListUtil;
|
||||||
import com.ruoyi.cms.util.StringUtil;
|
import com.ruoyi.cms.util.StringUtil;
|
||||||
import com.ruoyi.common.core.domain.entity.Company;
|
import com.ruoyi.common.core.domain.entity.Company;
|
||||||
|
import com.ruoyi.common.core.redis.RedisCache;
|
||||||
import com.ruoyi.common.core.text.Convert;
|
import com.ruoyi.common.core.text.Convert;
|
||||||
import com.ruoyi.common.utils.SiteSecurityUtils;
|
import com.ruoyi.common.utils.SiteSecurityUtils;
|
||||||
import com.ruoyi.common.utils.bean.BeanUtils;
|
import com.ruoyi.common.utils.bean.BeanUtils;
|
||||||
@@ -28,6 +29,7 @@ import org.springframework.stereotype.Service;
|
|||||||
|
|
||||||
import javax.annotation.PostConstruct;
|
import javax.annotation.PostConstruct;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -48,6 +50,16 @@ public class ESJobSearchImpl implements IESJobSearchService
|
|||||||
private AppUserServiceImpl appUserService;
|
private AppUserServiceImpl appUserService;
|
||||||
@Autowired
|
@Autowired
|
||||||
private ICompanyService iCompanyService;
|
private ICompanyService iCompanyService;
|
||||||
|
@Autowired
|
||||||
|
private RedisCache redisCache;
|
||||||
|
|
||||||
|
// 锁的key(唯一标识ES索引初始化)
|
||||||
|
private static final String ES_INIT_LOCK_KEY = "es:job_document:init:lock";
|
||||||
|
// 锁过期时间(30分钟,确保初始化完成)
|
||||||
|
private static final Integer LOCK_EXPIRE_SECONDS = 1800;
|
||||||
|
// 等待锁时间(5分钟,避免无限等待)
|
||||||
|
private static final Integer WAIT_LOCK_SECONDS = 300;
|
||||||
|
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private BussinessDictDataServiceImpl bussinessDictDataServicel;
|
private BussinessDictDataServiceImpl bussinessDictDataServicel;
|
||||||
@@ -58,7 +70,52 @@ public class ESJobSearchImpl implements IESJobSearchService
|
|||||||
@PostConstruct
|
@PostConstruct
|
||||||
public void init()
|
public void init()
|
||||||
{
|
{
|
||||||
resetTextCache();
|
boolean isLockAcquired = false;
|
||||||
|
try {
|
||||||
|
isLockAcquired = acquireDistributedLock();
|
||||||
|
if (isLockAcquired) {
|
||||||
|
resetTextCache();
|
||||||
|
} else {
|
||||||
|
logger.info("其他节点正在初始化ES索引,直接复用,无需重复执行");
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
logger.error("ES索引初始化等待锁异常", e);
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
} finally {
|
||||||
|
if (isLockAcquired) {
|
||||||
|
releaseDistributedLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 基于自定义RedisCache实现分布式锁(无getRedisTemplate适配版)
|
||||||
|
*/
|
||||||
|
private boolean acquireDistributedLock() throws InterruptedException {
|
||||||
|
long start = System.currentTimeMillis();
|
||||||
|
while (System.currentTimeMillis() - start < WAIT_LOCK_SECONDS * 1000) {
|
||||||
|
boolean lockExists = redisCache.hasKey(ES_INIT_LOCK_KEY);
|
||||||
|
if (!lockExists) {
|
||||||
|
redisCache.setCacheObject(ES_INIT_LOCK_KEY, "es_init_locked", LOCK_EXPIRE_SECONDS, TimeUnit.SECONDS);
|
||||||
|
logger.info("成功获取ES初始化分布式锁,key:{}", ES_INIT_LOCK_KEY);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
TimeUnit.MILLISECONDS.sleep(500);
|
||||||
|
}
|
||||||
|
logger.warn("等待{}秒未获取到ES初始化锁,放弃执行", WAIT_LOCK_SECONDS);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 释放分布式锁(用自定义RedisCache的deleteObject方法)
|
||||||
|
*/
|
||||||
|
private void releaseDistributedLock() {
|
||||||
|
try {
|
||||||
|
redisCache.deleteObject(ES_INIT_LOCK_KEY);
|
||||||
|
logger.info("已释放ES初始化分布式锁,key:{}", ES_INIT_LOCK_KEY);
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("释放ES初始化锁异常", e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -79,8 +136,24 @@ public class ESJobSearchImpl implements IESJobSearchService
|
|||||||
public void resetTextCache() {
|
public void resetTextCache() {
|
||||||
logger.info("正在重新刷新es");
|
logger.info("正在重新刷新es");
|
||||||
// 删除并重新创建索引
|
// 删除并重新创建索引
|
||||||
esJobDocumentMapper.deleteIndex("job_document");
|
/*esJobDocumentMapper.deleteIndex("job_document");
|
||||||
esJobDocumentMapper.createIndex();
|
esJobDocumentMapper.createIndex();*/
|
||||||
|
if (esJobDocumentMapper.existsIndex("job_document")) {
|
||||||
|
esJobDocumentMapper.deleteIndex("job_document");
|
||||||
|
logger.info("已删除原有job_document索引");
|
||||||
|
try {
|
||||||
|
TimeUnit.MILLISECONDS.sleep(500);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
logger.error("删除索引后休眠异常", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!esJobDocumentMapper.existsIndex("job_document")) {
|
||||||
|
esJobDocumentMapper.createIndex();
|
||||||
|
logger.info("已创建job_document索引");
|
||||||
|
} else {
|
||||||
|
logger.info("索引已被其他节点创建,直接复用");
|
||||||
|
}
|
||||||
|
|
||||||
// 分批次处理数据
|
// 分批次处理数据
|
||||||
int batchSize = 1000; // 每批次处理的数据量
|
int batchSize = 1000; // 每批次处理的数据量
|
||||||
|
|||||||
Reference in New Issue
Block a user