在《disruptor(一) 单一生产者和WorkPool消费者源码阅读》介绍了单一生产者
当多个生产者向RingBuffer中写入数据时,创建Disruptor时要修改对应的参数:
Disruptor disruptor = new Disruptor(eventFactory, BUFFER_SIZE, executor, ProducerType.MULTI, new YieldingWaitStrategy ());
ProducerType.MULTI 使用这个参数,单一生产者时,Sequencer接口的实现是SingleProducerSequencer;当是多个线程写入时,使用另一个实现MultiProducerSequencer
在写入获取RingBuffer的序列号的实现:
/** * @see Sequencer#next(int) */ @Override public long next(int n) { if (n < 1) { throw new IllegalArgumentException("n must be > 0"); } //生产者当前写入到的序列号 long current; //下一个序列号 long next; //while循环主要服务CAS算法,不成功就重来 do { //获取生产者最新的序列号 current = cursor.get(); //要获取的序列号 next = current + n; //wrapPoint是一个很关键的变量,这个变量决定生产者是否可以覆盖序列号nextSequence,wrapPoint是为什么是nextSequence - bufferSize;RingBuffer表现出来的是一个环形的数据结构,实际上是一个长度为bufferSize的数组, //nextSequence - bufferSize如果nextSequence小于bufferSize wrapPoint是负数,表示可以一直生产;如果nextSequence大于bufferSize wrapPoint是一个大于0的数,由于生产者和消费者的序列号差距不能超过bufferSize //(超过bufferSize会覆盖消费者未消费的数据),wrapPoint要小于等于多个消费者线程中消费的最小的序列号,即cachedGatingSequence的值,这就是下面if判断的根据 long wrapPoint = next - bufferSize; //cachedGatingSequence, gatingSequenceCache这两个变量记录着上一次获取消费者中最小的消费序列号 long cachedGatingSequence = gatingSequenceCache.get(); //生产者不能继续写入,否则会覆盖消费者未消费的数据 if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) { //获取最新的消费者最小的消费序号 long gatingSequence = Util.getMinimumSequence(gatingSequences, current); //依然不能满足写入条件(写入会覆盖为消费的数据) if (wrapPoint > gatingSequence) { //锁一会,结束本次循环,重来 LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy? continue; } //缓存一下消费者中最小的消费序列号 gatingSequenceCache.set(gatingSequence); } else if (cursor.compareAndSet(current, next)) //满足消费条件,有空余的空间让生产者写入,使用CAS算法,成功则跳出本次循环,不成功则重来 { break; } } while (true); return next; }
MultiProducerSequencer和SingleProducerSequencer next方法的区别就在于多了一个CAS算法,获取消费者最小序列号的while循环和放到外面和CAS的while循环合并
相关推荐
Disruptor3.x Disruptor使用方式 EventHandler[] eventHandlers=new DisruptorEventHandler[]{new DisruptorEventHandler()}; DisruptorPublisher dp=new DisruptorPublisher(1024, eventHandlers); dp.start(); ...
Error: java.lang.NoSuchMethodError: com.lmax.disruptor.dsl.Disruptor.<init>(Lcom/lmax/disruptor/EventFactory;ILjava/util/concurrent/ThreadFactory;Lcom/lmax/disruptor/dsl/ProducerType;Lcom/lmax/...
disruptor-3.4.4.jar 官方github下载 亲测可用,大家赶紧下载吧 后续再补充其他常用jar(但不好下载的)
赠送jar包:disruptor-3.3.0.jar; 赠送原API文档:disruptor-3.3.0-javadoc.jar; 赠送源代码:disruptor-3.3.0-sources.jar; 赠送Maven依赖信息文件:disruptor-3.3.0.pom; 包含翻译后的API文档:disruptor-...
赠送jar包:disruptor-3.3.0.jar; 赠送原API文档:disruptor-3.3.0-javadoc.jar; 赠送源代码:disruptor-3.3.0-sources.jar; 赠送Maven依赖信息文件:disruptor-3.3.0.pom; 包含翻译后的API文档:disruptor-...
从功能上来看,Disruptor 是实现了“队列”的功能,而且是一个有界队列。那么它的应用场景自然就是“生产者-消费者”模型的应用场合了。 可以拿 JDK 的 BlockingQueue 做一个简单对比,以便更好地认识 Disruptor 是...
赠送jar包:disruptor-3.3.7.jar 赠送原API文档:disruptor-3.3.7-javadoc.jar 赠送源代码:disruptor-3.3.7-sources.jar 包含翻译后的API文档:disruptor-3.3.7-javadoc-API文档-中文(简体)-英语-对照版.zip ...
简单讲解disruptor并附上demo
Disruptor是一个开源的Java框架,它被设计用于在生产者—消费者(producer-consumer problem,简称PCP)问题上获得尽量高的吞吐量(TPS)和尽量低的延迟。Disruptor是LMAX在线交易平台的关键组成部分,LMAX平台使用...
Disruptor框架是由LMAX公司开发的一款高效的无锁内存队列。使用无锁的方式实现了一个环形队列。据官方描述,其性能要比BlockingQueue至少高一个数量级。根据GitHub上的最新版本源码打出的包,希望对大家有帮助。
disruptor-3.2.0.jar包下载disruptor-3.2.0.jar包下载disruptor-3.2.0.jar包下载
disruptor-3.4.2.jar 工具jar包 及 disruptor-3.4.2-sources.jar, Disruptor它是一个开源的并发框架,并获得2011 Duke’s 程序框架创新奖,能够在无锁的情况下实现网络的Queue并发操作,是 log4j2 引用的 jar 包
业务逻辑处理器的核心是Disruptor。 Disruptor它是一个开源的并发框架,并获得2011 Duke’s 程序框架创新奖,能够在无锁的情况下实现网络的Queue并发操作。 Disruptor是一个高性能的异步处理框架,或者可以认为是最...
Disruptor它是一个开源的并发框架能够在无锁的情况下实现网络的Queue并发操作。同时,Disruptor是一个高性能的异步处理框架,或者可以认为是最快的消息框架(轻量的JMS),也可以认为是一个观察者模式的实现,或者...
disruptor 多个消费者 但是只消费一次 有时候会有这样的需求
Disruptor简单使用。完成多线程间并行、等待、先后执行等功能。
disruptor.jar 2018最新版本(包含disruptor-3.4.1.jar、disruptor-3.4.1-sources.jar、disruptor-3.4.1-javadoc.jar)
网上关于Disruptor的例子大部份是旧版本的, 其中集成spring的更少, 只好自已写个新版本简单的demo了。 该demo利用spring的定时器往Disruptor添加数据, 希望该demo能帮助到大家。
springboot集成disruptor,自动配置,便于快速开发,隐藏disruptor复杂的使用,线程池的频繁创建,优化开发。
Disruptor报错FatalExceptionHandler的解决办法,看网上这种解决办法挺少,整理了一下,分析了一下Dsiruptor的源码,并给出了解决方案