本文共 5913 字,大约阅读时间需要 19 分钟。
elastic-job 是一个很好的分布式任务框架,但是中间有一个坑,正在研究如何跳出。
一个任务分2片,先启动第一个,可以看到2片都是他执行,在启动第二片,第二个上线的服务直接帮他开始干活,一人一片,当第二个服务宕机后,第一个服务无法及时监控到宕机状态,没有干2片的活只是干一片的活,等了很久之后才发现第二个兄弟已经挂了,只有自己赡养双亲了。
原因分析:
elastic job是基于zookeeper监控其他的实例状态的,所以可能是zookeeper session问题,于是看zookeeper的日志,发现当一个实例上线后,创建了2个session id,我尝试着设置 session的timeout时间,发现只能设置第一个session失效时间,第二个死活设置不了,第二个是按照zookeeper session最大的session 存活时间设置的。。是40s, 而 elastic job 是监控的第二个session 断开后,执行的重新分片业务逻辑。。。
跟踪代码到了CuratorFramework ,第二个session好像是他创建的,目前不清楚如何解决。
原因找到了:
千万不要用starter,用我贴的代码自己改改,用starter没办法手动指定session过期时间,看着别人的教程中自己new了一个 ZookeeperRegistryCenter,加上starter带的那个就有2个了。
POM
org.springframework.boot spring-boot-starter-parent 1.5.13.RELEASE UTF-8 UTF-8 1.8 com.dangdang elastic-job-lite-core 2.1.5 com.dangdang elastic-job-lite-spring 2.1.5 com.alibaba druid-spring-boot-starter 1.1.2 org.springframework.boot spring-boot-autoconfigure mysql mysql-connector-java org.springframework.boot spring-boot-starter-jdbc
配置类:
@Configurationpublic class ElasticJobAutoConfiguration { @Value("${elaticjob.zookeeper.server-lists}") private String serverList; @Value("${elaticjob.zookeeper.namespace}") private String namespace; @Value("${elaticjob.zookeeper.session-timeout-milliseconds}") private int sessionTimeoutMilliseconds; @Autowired private ApplicationContext applicationContext; @PostConstruct public void initElasticJob(){ ZookeeperConfiguration config = new ZookeeperConfiguration(serverList, namespace); config.setSessionTimeoutMilliseconds(sessionTimeoutMilliseconds); ZookeeperRegistryCenter regCenter = new ZookeeperRegistryCenter(config); regCenter.init(); Mapmap = applicationContext.getBeansOfType(SimpleJob.class); for(Map.Entry entry : map.entrySet()){ SimpleJob simpleJob = entry.getValue(); ElasticSimpleJob elasticSimpleJobAnnotation=simpleJob.getClass().getAnnotation(ElasticSimpleJob.class); String cron= StringUtils.defaultIfBlank(elasticSimpleJobAnnotation.cron(), elasticSimpleJobAnnotation.value()); SimpleJobConfiguration simpleJobConfiguration=new SimpleJobConfiguration(JobCoreConfiguration.newBuilder(simpleJob.getClass().getName(), cron, elasticSimpleJobAnnotation.shardingTotalCount()).shardingItemParameters(elasticSimpleJobAnnotation.shardingItemParameters()).build(), simpleJob.getClass().getCanonicalName()); LiteJobConfiguration liteJobConfiguration=LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true).build(); String dataSourceRef=elasticSimpleJobAnnotation.dataSource(); if(StringUtils.isNotBlank(dataSourceRef)){ if(!applicationContext.containsBean(dataSourceRef)){ throw new RuntimeException("not exist datasource ["+dataSourceRef+"] !"); } DataSource dataSource=(DataSource)applicationContext.getBean(dataSourceRef); JobEventRdbConfiguration jobEventRdbConfiguration=new JobEventRdbConfiguration(dataSource); SpringJobScheduler jobScheduler=new SpringJobScheduler(simpleJob, regCenter, liteJobConfiguration,jobEventRdbConfiguration); jobScheduler.init(); }else{ SpringJobScheduler jobScheduler=new SpringJobScheduler(simpleJob, regCenter, liteJobConfiguration); jobScheduler.init(); } } } /** * 设置活动监听,前提是已经设置好了监听,见下一个目录 * @return */ @Bean public ElasticJobListener elasticJobListener() { return new ElasticJobListener(100, 100); }}
@ElasticSimpleJob(cron = "0/6 * * * * ?", jobName = "firstJob", shardingTotalCount = 2, jobParameter = "测试参数", shardingItemParameters = "0=A,1=B", dataSource = "datasource")@Componentpublic class MyJob implements SimpleJob { @Override public void execute(ShardingContext shardingContext) { System.out.println(String.format("------Thread ID: %s, 任务总片数: %s, " + "当前分片项: %s,当前参数: %s," + "当前任务名称: %s,当前任务参数: %s,"+ "当前任务的id: %s" , //获取当前线程的id Thread.currentThread().getId(), //获取任务总片数 shardingContext.getShardingTotalCount(), //获取当前分片项 shardingContext.getShardingItem(), //获取当前的参数 shardingContext.getShardingParameter(), //获取当前的任务名称 shardingContext.getJobName(), //获取当前任务参数 shardingContext.getJobParameter(), //获取任务的id shardingContext.getTaskId() )); }}
注解
@Target({ ElementType.TYPE })@Retention(RetentionPolicy.RUNTIME)public @interface ElasticSimpleJob { @AliasFor("cron") public abstract String value() default ""; @AliasFor("value") public abstract String cron() default ""; public abstract String jobName() default ""; public abstract int shardingTotalCount() default 1; public abstract String shardingItemParameters() default ""; public abstract String jobParameter() default ""; public abstract String dataSource() default ""; public abstract String description() default ""; public abstract boolean disabled() default false; public abstract boolean overwrite() default true;}
如果懒得一个个copy,就直接下载吧
转载地址:http://tgwni.baihongyu.com/