博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
elastic-job 和springboot 集成干货
阅读量:4072 次
发布时间:2019-05-25

本文共 5913 字,大约阅读时间需要 19 分钟。

elastic-job  是一个很好的分布式任务框架,但是中间有一个坑,正在研究如何跳出。

      一个任务分2片,先启动第一个,可以看到2片都是他执行,在启动第二片,第二个上线的服务直接帮他开始干活,一人一片,当第二个服务宕机后,第一个服务无法及时监控到宕机状态,没有干2片的活只是干一片的活,等了很久之后才发现第二个兄弟已经挂了,只有自己赡养双亲了。

 

  原因分析:

                     elastic job是基于zookeeper监控其他的实例状态的,所以可能是zookeeper session问题,于是看zookeeper的日志,发现当一个实例上线后,创建了2个session id,我尝试着设置 session的timeout时间,发现只能设置第一个session失效时间,第二个死活设置不了,第二个是按照zookeeper session最大的session 存活时间设置的。。是40s, 而 elastic job 是监控的第二个session 断开后,执行的重新分片业务逻辑。。。

                       跟踪代码到了CuratorFramework ,第二个session好像是他创建的,目前不清楚如何解决。

 

    原因找到了:

                           千万不要用starter,用我贴的代码自己改改,用starter没办法手动指定session过期时间,看着别人的教程中自己new了一个 ZookeeperRegistryCenter,加上starter带的那个就有2个了。

 

POM

  

org.springframework.boot
spring-boot-starter-parent
1.5.13.RELEASE
UTF-8
UTF-8
1.8
com.dangdang
elastic-job-lite-core
2.1.5
com.dangdang
elastic-job-lite-spring
2.1.5
com.alibaba
druid-spring-boot-starter
1.1.2
org.springframework.boot
spring-boot-autoconfigure
mysql
mysql-connector-java
org.springframework.boot
spring-boot-starter-jdbc

配置类:

               

@Configurationpublic class ElasticJobAutoConfiguration {    @Value("${elaticjob.zookeeper.server-lists}")    private String serverList;    @Value("${elaticjob.zookeeper.namespace}")    private String namespace;   @Value("${elaticjob.zookeeper.session-timeout-milliseconds}")   private int sessionTimeoutMilliseconds;    @Autowired    private ApplicationContext applicationContext;    @PostConstruct    public void initElasticJob(){        ZookeeperConfiguration config = new ZookeeperConfiguration(serverList, namespace);        config.setSessionTimeoutMilliseconds(sessionTimeoutMilliseconds);        ZookeeperRegistryCenter regCenter = new ZookeeperRegistryCenter(config);        regCenter.init();        Map
map = applicationContext.getBeansOfType(SimpleJob.class); for(Map.Entry
entry : map.entrySet()){ SimpleJob simpleJob = entry.getValue(); ElasticSimpleJob elasticSimpleJobAnnotation=simpleJob.getClass().getAnnotation(ElasticSimpleJob.class); String cron= StringUtils.defaultIfBlank(elasticSimpleJobAnnotation.cron(), elasticSimpleJobAnnotation.value()); SimpleJobConfiguration simpleJobConfiguration=new SimpleJobConfiguration(JobCoreConfiguration.newBuilder(simpleJob.getClass().getName(), cron, elasticSimpleJobAnnotation.shardingTotalCount()).shardingItemParameters(elasticSimpleJobAnnotation.shardingItemParameters()).build(), simpleJob.getClass().getCanonicalName()); LiteJobConfiguration liteJobConfiguration=LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true).build(); String dataSourceRef=elasticSimpleJobAnnotation.dataSource(); if(StringUtils.isNotBlank(dataSourceRef)){ if(!applicationContext.containsBean(dataSourceRef)){ throw new RuntimeException("not exist datasource ["+dataSourceRef+"] !"); } DataSource dataSource=(DataSource)applicationContext.getBean(dataSourceRef); JobEventRdbConfiguration jobEventRdbConfiguration=new JobEventRdbConfiguration(dataSource); SpringJobScheduler jobScheduler=new SpringJobScheduler(simpleJob, regCenter, liteJobConfiguration,jobEventRdbConfiguration); jobScheduler.init(); }else{ SpringJobScheduler jobScheduler=new SpringJobScheduler(simpleJob, regCenter, liteJobConfiguration); jobScheduler.init(); } } } /** * 设置活动监听,前提是已经设置好了监听,见下一个目录 * @return */ @Bean public ElasticJobListener elasticJobListener() { return new ElasticJobListener(100, 100); }}

 

@ElasticSimpleJob(cron = "0/6 * * * * ?",        jobName = "firstJob",        shardingTotalCount = 2,        jobParameter = "测试参数",        shardingItemParameters = "0=A,1=B",        dataSource = "datasource")@Componentpublic class MyJob implements SimpleJob {    @Override    public void execute(ShardingContext shardingContext) {        System.out.println(String.format("------Thread ID: %s, 任务总片数: %s, " +                        "当前分片项: %s,当前参数: %s," +                        "当前任务名称: %s,当前任务参数: %s,"+                        "当前任务的id: %s"                ,                //获取当前线程的id                Thread.currentThread().getId(),                //获取任务总片数                shardingContext.getShardingTotalCount(),                //获取当前分片项                shardingContext.getShardingItem(),                //获取当前的参数                shardingContext.getShardingParameter(),                //获取当前的任务名称                shardingContext.getJobName(),                //获取当前任务参数                shardingContext.getJobParameter(),                //获取任务的id                shardingContext.getTaskId()        ));    }}

   注解

@Target({ ElementType.TYPE })@Retention(RetentionPolicy.RUNTIME)public @interface ElasticSimpleJob {    @AliasFor("cron")    public abstract String value() default "";    @AliasFor("value")    public abstract String cron() default "";    public abstract String jobName() default "";    public abstract int shardingTotalCount() default 1;    public abstract String shardingItemParameters() default "";    public abstract String jobParameter() default "";    public abstract String dataSource() default "";    public abstract String description() default "";    public abstract boolean disabled() default false;    public abstract boolean overwrite() default true;}

  如果懒得一个个copy,就直接下载吧

转载地址:http://tgwni.baihongyu.com/

你可能感兴趣的文章
Netty框架
查看>>
线程同步(C# 编程指南)
查看>>
flex addChild 的一个小细节
查看>>
Adobe Flash gets its full launch on Android
查看>>
java.nio.BufferOverflowException
查看>>
对于大型公司项目平台选择j2ee的几层认识(二)
查看>>
flash player10 Vector类型
查看>>
德克萨斯扑克初级玩家必胜玩法
查看>>
Flex数据绑定陷阱:常见的误用和错误(一) - 闪吧教材.jpg
查看>>
alchemy的Box2D版本
查看>>
Alchemy简单入门教程(FlashCS4环境)
查看>>
AS3的深度管理及排序
查看>>
翻译:Adobe AIR 2.6的新特性
查看>>
puremvc多核版与单核版的区别
查看>>
详细说说ActionScript中function的call()方法和apply()方法
查看>>
WebBase(基于AS3的Flash全站基架)
查看>>
loader如果你提前设width或height,loadComplete后显示不出来
查看>>
如果Stage不是NoScale模式,那么接收不到Event.Resize事件
查看>>
cygwin高速下载网址
查看>>
Flash调用Alchemy编译的代码时出现Error #1506的解决
查看>>