feat: 添加 Disruptor 学习项目

- 基于 JDK 21 + Spring Boot 3.5.0
- 使用 Disruptor 3.4.4
- 包含详细的中文注释和 @see 引用
- 提供 RingBuffer 核心功能演示
- 27 个单元测试,全部通过
- 添加 README 文档
- 配置 .gitignore 排除非项目文件
This commit is contained in:
F嘉阳
2025-12-31 10:41:35 +08:00
commit c9600a1231
16 changed files with 3101 additions and 0 deletions

30
.gitignore vendored Normal file
View File

@@ -0,0 +1,30 @@
# IDE
.idea/
*.iml
.vscode/
*.swp
*.swo
*~
# Maven
target/
pom.xml.tag
pom.xml.releaseBackup
pom.xml.versionsBackup
pom.xml.next
release.properties
dependency-reduced-pom.xml
buildNumber.properties
.mvn/timing.properties
.mvn/wrapper/maven-wrapper.jar
# Logs
*.log
logs/
# OS
.DS_Store
Thumbs.db
# Spring Boot
spring-boot-*.log

299
README.md Normal file
View File

@@ -0,0 +1,299 @@
# Disruptor 学习项目
基于 JDK 21 + Spring Boot 3 的 LMAX Disruptor 高性能学习项目,包含详细的中文注释和单元测试,便于深入学习和理解 Disruptor 框架的核心原理。
## 项目简介
本项目是一个完整的 Disruptor 学习示例,演示了 Disruptor 框架的核心功能和最佳实践。通过实际代码和详细注释,帮助开发者理解:
- RingBuffer 环形缓冲区的工作原理
- 事件发布-订阅模式
- 序列号Sequence机制
- 等待策略WaitStrategy的选择
- 多消费者并行/串行处理
## 技术栈
- **JDK**: 21
- **Spring Boot**: 3.5.0
- **Disruptor**: 3.4.4
- **构建工具**: Maven
- **测试框架**: JUnit 5
## 项目结构
```
disruptor-learn/
├── src/main/java/com/example/disruptor/
│ ├── event/
│ │ ├── OrderEvent.java # 订单事件类
│ │ └── OrderEventFactory.java # 事件工厂
│ ├── handler/
│ │ └── OrderEventHandler.java # 事件处理器
│ ├── producer/
│ │ └── OrderEventProducer.java # 事件生产者
│ ├── config/
│ │ └── DisruptorConfig.java # Disruptor 配置
│ ├── controller/
│ │ └── OrderController.java # REST API 控制器
│ ├── ringbuffer/
│ │ └── RingBufferDemo.java # RingBuffer 核心演示
│ └── DisruptorLearnApplication.java
├── src/test/java/com/example/disruptor/
│ ├── DisruptorCoreFunctionalityTest.java # Disruptor 核心功能测试
│ ├── RingBufferCoreTest.java # RingBuffer 核心测试
│ └── WaitStrategyPerformanceTest.java # 等待策略性能测试
└── pom.xml
```
## 快速开始
### 环境要求
- JDK 21 或更高版本
- Maven 3.6 或更高版本
### 编译项目
```bash
mvn clean compile
```
### 运行测试
```bash
mvn test
```
### 启动应用
```bash
mvn spring-boot:run
```
应用启动后,访问 http://localhost:8080
## 核心功能
### 1. Disruptor 核心功能
#### 事件发布和消费
```java
// 发布事件
OrderEventProducer producer = new OrderEventProducer(ringBuffer);
producer.publishEvent(orderId, amount, orderType);
// 消费事件
disruptor.handleEventsWith(new OrderEventHandler("订单处理器"));
```
#### 多消费者处理
```java
// 并行处理
disruptor.handleEventsWith(handler1, handler2, handler3);
// 串行处理
disruptor.handleEventsWith(handler1).then(handler2).then(handler3);
```
#### 批量发布
```java
// 批量获取序列号
long startSequence = ringBuffer.next(batchSize);
// 填充事件
for (int i = 0; i < batchSize; i++) {
OrderEvent event = ringBuffer.get(startSequence + i);
// 填充数据
}
// 批量发布
ringBuffer.publish(startSequence, batchSize);
```
### 2. RingBuffer 核心功能
#### 环形数据结构
RingBuffer 使用固定大小的环形数组,通过位运算实现高效的索引计算:
```java
// 序列号到索引的转换(位运算)
int index = (int) (sequence & (bufferSize - 1));
```
#### 序列号机制
```java
// 获取序列号
long sequence = ringBuffer.next();
// 获取事件对象
OrderEvent event = ringBuffer.get(sequence);
// 发布事件
ringBuffer.publish(sequence);
```
## API 文档
### 创建单个订单
```bash
POST /api/orders
Content-Type: application/json
{
"amount": 100.50,
"orderType": "NORMAL"
}
```
### 批量创建订单
```bash
POST /api/orders/batch
Content-Type: application/json
{
"count": 10,
"amount": 50.0,
"orderType": "BATCH"
}
```
### 查看系统状态
```bash
GET /api/orders/status
```
## 等待策略
Disruptor 提供多种等待策略,不同的策略在性能和 CPU 使用率之间有不同的权衡:
| 策略 | CPU 使用 | 延迟 | 吞吐量 | 适用场景 |
|------|----------|------|--------|----------|
| BlockingWaitStrategy | 低 | 高 | 高 | 通用场景 |
| SleepingWaitStrategy | 低 | 中 | 高 | 平衡场景 |
| YieldingWaitStrategy | 高 | 低 | 最高 | 低延迟场景 |
| BusySpinWaitStrategy | 极高 | 最低 | 最高 | 超低延迟 |
| PhasedBackoffWaitStrategy | 自适应 | 自适应 | 高 | 自适应场景 |
### 选择建议
- **通用场景**: 使用 `SleepingWaitStrategy`,在性能和 CPU 使用之间取得平衡
- **低延迟场景**: 使用 `YieldingWaitStrategy`,牺牲 CPU 使用率换取低延迟
- **超低延迟场景**: 使用 `BusySpinWaitStrategy`,适用于对延迟极其敏感的场景
- **资源受限场景**: 使用 `BlockingWaitStrategy`,最小化 CPU 使用
## 测试说明
项目包含 27 个单元测试,覆盖以下方面:
### DisruptorCoreFunctionalityTest
- Disruptor 初始化测试
- 事件发布和消费测试
- 序列号机制测试
- 环形特性测试
- 多消费者并行处理测试
- 消费者串行处理测试
- 批量发布测试
- 不同等待策略测试
- 事件对象复用测试
### RingBufferCoreTest
- RingBuffer 基本属性测试
- 序列号到索引转换测试
- 游标管理测试
- 剩余容量计算测试
- 环形循环特性测试
- 事件对象获取和设置测试
- 批量序列号分配测试
- 2的幂次方判断测试
- 序列号边界情况测试
- 事件对象预分配测试
- 序列号连续性测试
### WaitStrategyPerformanceTest
- BlockingWaitStrategy 性能测试
- SleepingWaitStrategy 性能测试
- YieldingWaitStrategy 性能测试
- BusySpinWaitStrategy 性能测试
- PhasedBackoffWaitStrategy 性能测试
- 所有等待策略性能对比
运行测试:
```bash
# 运行所有测试
mvn test
# 运行特定测试类
mvn test -Dtest=DisruptorCoreFunctionalityTest
# 运行特定测试方法
mvn test -Dtest=RingBufferCoreTest#testSequenceMechanism
```
## 核心概念
### RingBuffer
RingBuffer 是 Disruptor 的核心数据结构,是一个固定大小的环形数组。它的特点:
- **预分配内存**: 在初始化时预分配所有事件对象,避免运行时 GC
- **环形结构**: 使用位运算实现高效的索引计算
- **无锁设计**: 通过序列号机制实现线程安全,无需使用锁
### Sequence
Sequence 是一个原子递增的序列号,用于:
- 追踪事件在缓冲区中的位置
- 协调生产者和消费者
- 实现无锁并发
### WaitStrategy
WaitStrategy 决定消费者如何等待新事件,是 Disruptor 性能调优的关键参数。
### EventFactory
EventFactory 用于在 Disruptor 初始化时预分配事件对象。
### EventHandler
EventHandler 是消费者处理事件的接口,每个 EventHandler 在独立的线程中运行。
## 学习资源
- [Disruptor 官方文档](https://github.com/lmax-exchange/disruptor/wiki)
- [Disruptor 源码](https://github.com/lmax-exchange/disruptor)
- [Disruptor 性能测试](https://github.com/lmax-exchange/disruptor/wiki/Performance-Results)
- [LMAX Disruptor 论文](https://lmax-exchange.github.io/disruptor/disruptor.html)
## 最佳实践
1. **缓冲区大小**: 必须是 2 的幂次方(如 1024, 2048, 4096
2. **事件对象**: 提供清晰的 `clear()` 方法,在事件被复用前清理数据
3. **等待策略**: 根据实际场景选择合适的等待策略
4. **批量处理**: 使用 `endOfBatch` 标志进行批量操作优化
5. **避免阻塞**: 在 EventHandler 中避免执行阻塞操作
## 注意事项
- 所有代码都包含详细的中文注释和 `@see` 引用
- 测试覆盖了核心功能和边界场景
- 项目使用 JDK 21 和 Spring Boot 3.5.0
- 确保本地已安装 Maven项目不包含 Maven Wrapper
## 许可证
本项目仅用于学习目的,遵循 Disruptor 的 Apache 2.0 许可证。

89
pom.xml Normal file
View File

@@ -0,0 +1,89 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.5.0</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>disruptor-learn</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>disruptor-learn</name>
<description>Demo project for Spring Boot</description>
<url/>
<licenses>
<license/>
</licenses>
<developers>
<developer/>
</developers>
<scm>
<connection/>
<developerConnection/>
<tag/>
<url/>
</scm>
<properties>
<java.version>21</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- LMAX Disruptor 核心库 -->
<!-- @see https://github.com/lmax-exchange/disruptor -->
<!-- @see com.lmax.disruptor.RingBuffer -->
<!-- Disruptor 是一个高性能的线程间消息传递库,用于低延迟、高吞吐量的并发事件处理 -->
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.4</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<annotationProcessorPaths>
<path>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</path>
</annotationProcessorPaths>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,13 @@
package com.example.disruptor;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class DisruptorLearnApplication {
public static void main(String[] args) {
SpringApplication.run(DisruptorLearnApplication.class, args);
}
}

View File

@@ -0,0 +1,236 @@
package com.example.disruptor.config;
import com.example.disruptor.event.OrderEvent;
import com.example.disruptor.event.OrderEventFactory;
import com.example.disruptor.handler.OrderEventHandler;
import com.example.disruptor.producer.OrderEventProducer;
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Disruptor 配置类
* <p>
* 配置 Disruptor 的核心组件,包括:
* <ul>
* <li>RingBuffer环形缓冲区</li>
* <li>EventFactory事件工厂</li>
* <li>EventHandler事件处理器</li>
* <li>WaitStrategy等待策略</li>
* <li>ProducerType生产者类型</li>
* </ul>
* </p>
*
* <p>核心概念:</p>
* <ul>
* <li><b>RingBuffer</b>: 环形缓冲区Disruptor 的核心数据结构</li>
* <li><b>WaitStrategy</b>: 决定消费者如何等待新事件影响性能和CPU使用</li>
* <li><b>ProducerType</b>: 单生产者或多生产者模式</li>
* <li><b>Buffer Size</b>: 必须是2的幂次方</li>
* </ul>
*
* @see com.lmax.disruptor.dsl.Disruptor
* @see com.lmax.disruptor.RingBuffer
* @see com.lmax.disruptor.WaitStrategy
*/
@Slf4j
@Configuration
public class DisruptorConfig {
/**
* 缓冲区大小
* <p>
* 必须是2的幂次方因为 Disruptor 使用位运算来计算索引。
* 常见大小1024, 2048, 4096, 8192 等
* </p>
* <p>
* 选择建议:
* <ul>
* <li>太小:会导致生产者频繁阻塞</li>
* <li>太大:会占用过多内存,增加缓存未命中</li>
* <li>需要根据实际场景和测试结果调整</li>
* </ul>
* </p>
*/
private static final int BUFFER_SIZE = 1024;
/**
* 线程工厂
* <p>
* 用于创建 Disruptor 的工作线程。
* 自定义线程工厂可以更好地控制线程的命名和属性。
* </p>
*
* @return 自定义线程工厂
* @see java.util.concurrent.ThreadFactory
*/
@Bean
public ThreadFactory disruptorThreadFactory() {
AtomicInteger threadNumber = new AtomicInteger(1);
return r -> {
Thread thread = new Thread(r);
thread.setName("disruptor-thread-" + threadNumber.getAndIncrement());
thread.setDaemon(true);
return thread;
};
}
/**
* 等待策略
* <p>
* 决定消费者如何等待新事件,是 Disruptor 性能调优的关键参数。
* </p>
*
* <p>常用策略对比:</p>
* <table border="1">
* <tr><th>策略</th><th>CPU使用</th><th>延迟</th><th>吞吐量</th><th>适用场景</th></tr>
* <tr><td>BlockingWaitStrategy</td><td>低</td><td>高</td><td>高</td><td>通用场景</td></tr>
* <tr><td>SleepingWaitStrategy</td><td>低</td><td>中</td><td>高</td><td>平衡场景</td></tr>
* <tr><td>YieldingWaitStrategy</td><td>高</td><td>低</td><td>最高</td><td>低延迟场景</td></tr>
* <tr><td>BusySpinWaitStrategy</td><td>极高</td><td>最低</td><td>最高</td><td>超低延迟</td></tr>
* <tr><td>PhasedBackoffWaitStrategy</td><td>自适应</td><td>自适应</td><td>高</td><td>自适应场景</td></tr>
* </table>
*
* @return 等待策略实例
* @see com.lmax.disruptor.WaitStrategy
* @see com.lmax.disruptor.BlockingWaitStrategy
* @see com.lmax.disruptor.SleepingWaitStrategy
* @see com.lmax.disruptor.YieldingWaitStrategy
* @see com.lmax.disruptor.BusySpinWaitStrategy
*/
@Bean
public WaitStrategy waitStrategy() {
// 使用 SleepingWaitStrategy在性能和CPU使用之间取得平衡
// 它会先自旋一段时间,然后使用 Thread.yield(),最后使用 LockSupport.parkNanos()
return new SleepingWaitStrategy();
}
/**
* Disruptor 实例
* <p>
* Disruptor 是整个框架的入口点,负责协调生产者和消费者。
* </p>
*
* <p>初始化流程:</p>
* <ol>
* <li>创建 Disruptor 实例,配置缓冲区大小、线程工厂、等待策略等</li>
* <li>注册 EventHandler消费者</li>
* <li>启动 Disruptor开始处理事件</li>
* <li>获取 RingBuffer 用于发布事件</li>
* </ol>
*
* <p>生产者类型选择:</p>
* <ul>
* <li><b>SINGLE</b>: 单生产者模式,性能更高,适用于只有一个生产者</li>
* <li><b>MULTI</b>: 多生产者模式,支持多个生产者并发发布</li>
* </ul>
*
* @param threadFactory 线程工厂
* @param waitStrategy 等待策略
* @return Disruptor 实例
* @see com.lmax.disruptor.dsl.Disruptor
* @see com.lmax.disruptor.dsl.ProducerType
*/
@Bean
public Disruptor<OrderEvent> disruptor(ThreadFactory threadFactory, WaitStrategy waitStrategy) {
log.info("初始化 Disruptor缓冲区大小: {}", BUFFER_SIZE);
// 创建 Disruptor 实例
// 参数说明:
// 1. eventFactory: 事件工厂,用于预分配事件对象
// 2. bufferSize: 缓冲区大小必须是2的幂次方
// 3. threadFactory: 线程工厂,用于创建消费者线程
// 4. producerType: 生产者类型SINGLE 或 MULTI
// 5. waitStrategy: 等待策略,决定消费者如何等待新事件
Disruptor<OrderEvent> disruptor = new Disruptor<>(
new OrderEventFactory(),
BUFFER_SIZE,
threadFactory,
ProducerType.SINGLE,
waitStrategy
);
// 注册事件处理器(消费者)
// 可以注册多个处理器,它们可以并行或串行处理事件
disruptor.handleEventsWith(new OrderEventHandler("订单处理器-1"));
// 如果需要多个处理器并行处理:
// disruptor.handleEventsWith(handler1, handler2, handler3);
// 如果需要串行处理:
// disruptor.handleEventsWith(handler1).then(handler2).then(handler3);
// 启动 Disruptor
// 启动后,消费者线程开始运行,等待事件
RingBuffer<OrderEvent> ringBuffer = disruptor.start();
log.info("Disruptor 启动成功RingBuffer 容量: {}, 剩余容量: {}",
ringBuffer.getBufferSize(), ringBuffer.remainingCapacity());
return disruptor;
}
/**
* RingBuffer Bean
* <p>
* RingBuffer 是 Disruptor 的核心数据结构,用于存储和传递事件。
* 生产者通过 RingBuffer 发布事件,消费者从 RingBuffer 消费事件。
* </p>
*
* <p>核心特性:</p>
* <ul>
* <li>固定大小的环形数组</li>
* <li>预分配内存避免GC</li>
* <li>使用序列号追踪事件位置</li>
* <li>无锁设计,高性能</li>
* </ul>
*
* @param disruptor Disruptor 实例
* @return RingBuffer 实例
* @see com.lmax.disruptor.RingBuffer
*/
@Bean
public RingBuffer<OrderEvent> ringBuffer(Disruptor<OrderEvent> disruptor) {
return disruptor.getRingBuffer();
}
/**
* OrderEventProducer Bean
* <p>
* 封装了事件发布的逻辑提供简洁的API供业务代码使用。
* </p>
*
* @param ringBuffer RingBuffer 实例
* @return OrderEventProducer 实例
* @see com.example.disruptor.producer.OrderEventProducer
*/
@Bean
public OrderEventProducer orderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
return new OrderEventProducer(ringBuffer);
}
/**
* ExecutorService Bean
* <p>
* 用于管理 Disruptor 的消费者线程。
* 虽然可以使用默认的线程工厂,但自定义 ExecutorService 可以更好地控制线程池。
* </p>
*
* @return ExecutorService 实例
* @see java.util.concurrent.ExecutorService
*/
@Bean
public ExecutorService executorService() {
return Executors.newCachedThreadPool(r -> {
Thread thread = new Thread(r);
thread.setName("disruptor-executor-" + thread.getId());
thread.setDaemon(true);
return thread;
});
}
}

View File

@@ -0,0 +1,197 @@
package com.example.disruptor.controller;
import com.example.disruptor.producer.OrderEventProducer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
/**
* 订单控制器
* <p>
* 提供 REST API 接口,用于演示 Disruptor 的使用。
* 通过 HTTP 请求触发事件发布,观察 Disruptor 如何处理事件。
* </p>
*
* @see com.example.disruptor.producer.OrderEventProducer
* @see com.lmax.disruptor.RingBuffer
*/
@Slf4j
@RestController
@RequestMapping("/api/orders")
@RequiredArgsConstructor
public class OrderController {
/**
* 订单事件生产者
*/
private final OrderEventProducer orderEventProducer;
/**
* 订单ID计数器用于生成唯一的订单ID
*/
private final AtomicLong orderIdGenerator = new AtomicLong(1);
/**
* 创建单个订单
* <p>
* 通过 HTTP POST 请求创建订单,事件会被发布到 Disruptor 的环形缓冲区中。
* </p>
*
* <p>请求示例:</p>
* <pre>
* POST /api/orders
* {
* "amount": 100.50,
* "orderType": "NORMAL"
* }
* </pre>
*
* @param orderRequest 订单请求
* @return 订单创建结果
*/
@PostMapping
public Map<String, Object> createOrder(@RequestBody OrderRequest orderRequest) {
long orderId = orderIdGenerator.getAndIncrement();
log.info("接收到订单创建请求 - 订单ID: {}, 金额: {}, 类型: {}",
orderId, orderRequest.getAmount(), orderRequest.getOrderType());
// 发布事件到 Disruptor
orderEventProducer.publishEvent(
orderId,
orderRequest.getAmount(),
orderRequest.getOrderType()
);
Map<String, Object> response = new HashMap<>();
response.put("orderId", orderId);
response.put("amount", orderRequest.getAmount());
response.put("orderType", orderRequest.getOrderType());
response.put("message", "订单已提交,正在处理中");
response.put("timestamp", System.currentTimeMillis());
return response;
}
/**
* 批量创建订单
* <p>
* 演示批量发布事件的功能,可以提升发布性能。
* </p>
*
* <p>请求示例:</p>
* <pre>
* POST /api/orders/batch
* {
* "count": 10,
* "amount": 50.0,
* "orderType": "BATCH"
* }
* </pre>
*
* @param batchRequest 批量订单请求
* @return 批量创建结果
*/
@PostMapping("/batch")
public Map<String, Object> createOrdersBatch(@RequestBody BatchOrderRequest batchRequest) {
int count = batchRequest.getCount();
Object[][] orders = new Object[count][3];
for (int i = 0; i < count; i++) {
long orderId = orderIdGenerator.getAndIncrement();
orders[i][0] = orderId;
orders[i][1] = batchRequest.getAmount();
orders[i][2] = batchRequest.getOrderType();
}
log.info("接收到批量订单创建请求 - 数量: {}, 金额: {}, 类型: {}",
count, batchRequest.getAmount(), batchRequest.getOrderType());
// 批量发布事件
orderEventProducer.publishEventsBatch(orders);
Map<String, Object> response = new HashMap<>();
response.put("count", count);
response.put("amount", batchRequest.getAmount());
response.put("orderType", batchRequest.getOrderType());
response.put("message", "批量订单已提交,正在处理中");
response.put("timestamp", System.currentTimeMillis());
return response;
}
/**
* 获取系统状态
*
* @return 系统状态信息
*/
@GetMapping("/status")
public Map<String, Object> getStatus() {
Map<String, Object> status = new HashMap<>();
status.put("totalOrders", orderIdGenerator.get());
status.put("timestamp", System.currentTimeMillis());
status.put("status", "running");
return status;
}
/**
* 订单请求
*/
public static class OrderRequest {
private double amount;
private String orderType = "NORMAL";
public double getAmount() {
return amount;
}
public void setAmount(double amount) {
this.amount = amount;
}
public String getOrderType() {
return orderType;
}
public void setOrderType(String orderType) {
this.orderType = orderType;
}
}
/**
* 批量订单请求
*/
public static class BatchOrderRequest {
private int count = 10;
private double amount;
private String orderType = "BATCH";
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
public double getAmount() {
return amount;
}
public void setAmount(double amount) {
this.amount = amount;
}
public String getOrderType() {
return orderType;
}
public void setOrderType(String orderType) {
this.orderType = orderType;
}
}
}

View File

@@ -0,0 +1,65 @@
package com.example.disruptor.event;
import lombok.Data;
/**
* 订单事件类
* <p>
* 这是一个在 Disruptor 环形缓冲区中传递的事件对象。
* Disruptor 通过预分配这些对象来避免在运行时进行垃圾回收,从而提高性能。
* </p>
*
* <p>设计要点:</p>
* <ul>
* <li>对象必须是可重用的,避免在事件处理过程中创建新对象</li>
* <li>提供 clear() 方法在事件被复用前清理数据</li>
* <li>使用简单的数据结构以减少内存占用</li>
* </ul>
*
* @see com.lmax.disruptor.RingBuffer
* @see com.lmax.disruptor.EventFactory
* @see <a href="https://github.com/lmax-exchange/disruptor/wiki/Core-Concepts">Disruptor Core Concepts</a>
*/
@Data
public class OrderEvent {
/**
* 订单ID
*/
private long orderId;
/**
* 订单金额
*/
private double amount;
/**
* 订单类型
*/
private String orderType;
/**
* 事件创建时间戳(纳秒)
*/
private long timestamp;
/**
* 清理事件数据
* <p>
* Disruptor 会复用事件对象,因此在事件被重新使用前需要清理之前的数据。
* 这对于避免内存泄漏和数据混淆非常重要。
* </p>
* <p>
* 最佳实践:在 EventHandler 处理完事件后,调用 clear() 方法清理数据。
* 或者使用专门的 ClearingEventHandler 来自动清理。
* </p>
*
* @see com.lmax.disruptor.examples.objectevent.ClearingEventHandler
*/
public void clear() {
this.orderId = 0;
this.amount = 0.0;
this.orderType = null;
this.timestamp = 0;
}
}

View File

@@ -0,0 +1,45 @@
package com.example.disruptor.event;
import com.lmax.disruptor.EventFactory;
/**
* 订单事件工厂类
* <p>
* 实现 {@link EventFactory} 接口,用于在 Disruptor 初始化时预分配事件对象。
* </p>
*
* <p>核心概念:</p>
* <ul>
* <li>Disruptor 在启动时会预分配固定数量的事件对象到环形缓冲区</li>
* <li>这些对象在运行时会被重复使用,避免了频繁的对象创建和垃圾回收</li>
* <li>这种预分配策略是 Disruptor 高性能的关键因素之一</li>
* </ul>
*
* <p>使用场景:</p>
* <ul>
* <li>Disruptor 在初始化时会调用 newInstance() 方法 bufferSize 次</li>
* <li>这些预分配的对象会被存储在 RingBuffer 中循环使用</li>
* <li>生产者获取对象后填充数据,消费者处理完后对象会被复用</li>
* </ul>
*
* @see com.lmax.disruptor.EventFactory
* @see com.lmax.disruptor.RingBuffer
* @see <a href="https://github.com/lmax-exchange/disruptor/wiki/Performance-Results">Disruptor Performance</a>
*/
public class OrderEventFactory implements EventFactory<OrderEvent> {
/**
* 创建新的事件对象
* <p>
* 该方法会在 Disruptor 初始化时被调用多次,以预填充环形缓冲区。
* 每次调用都应该返回一个新的、独立的事件对象实例。
* </p>
*
* @return 新创建的 OrderEvent 实例
* @see com.lmax.disruptor.RingBuffer#RingBuffer(com.lmax.disruptor.EventFactory, com.lmax.disruptor.sequencer.Sequencer)
*/
@Override
public OrderEvent newInstance() {
return new OrderEvent();
}
}

View File

@@ -0,0 +1,140 @@
package com.example.disruptor.handler;
import com.example.disruptor.event.OrderEvent;
import com.lmax.disruptor.EventHandler;
import lombok.extern.slf4j.Slf4j;
/**
* 订单事件处理器
* <p>
* 实现 {@link EventHandler} 接口,用于处理从环形缓冲区中消费的事件。
* 这是 Disruptor 模式中的消费者核心组件。
* </p>
*
* <p>核心概念:</p>
* <ul>
* <li>EventHandler 是消费者处理事件的回调接口</li>
* <li>每个 EventHandler 会在独立的线程中运行(由 Executor 管理)</li>
* <li>Disruptor 支持多个 EventHandler 并行或串行处理事件</li>
* <li>通过 sequence 参数可以追踪事件在缓冲区中的位置</li>
* </ul>
*
* <p>参数说明:</p>
* <ul>
* <li><b>event</b>: 从 RingBuffer 中获取的事件对象</li>
* <li><b>sequence</b>: 事件的序列号,用于追踪和调试</li>
* <li><b>endOfBatch</b>: 标识是否为当前批次中的最后一个事件,可用于批量处理优化</li>
* </ul>
*
* @see com.lmax.disruptor.EventHandler
* @see com.lmax.disruptor.BatchEventProcessor
* @see com.lmax.disruptor.dsl.Disruptor#handleEventsWith(EventHandler[])
*/
@Slf4j
public class OrderEventHandler implements EventHandler<OrderEvent> {
/**
* 处理器的名称,用于标识不同的处理器实例
*/
private final String handlerName;
/**
* 构造函数
*
* @param handlerName 处理器名称,便于日志追踪和调试
*/
public OrderEventHandler(String handlerName) {
this.handlerName = handlerName;
}
/**
* 处理事件的核心方法
* <p>
* 当事件可用时Disruptor 会调用此方法。该方法在独立的线程中执行。
* </p>
*
* <p>处理流程:</p>
* <ol>
* <li>接收从 RingBuffer 中传递过来的事件</li>
* <li>执行业务逻辑处理(这里模拟订单处理)</li>
* <li>可选择清理事件数据(建议在最后一个处理器中清理)</li>
* </ol>
*
* <p>性能优化建议:</p>
* <ul>
* <li>避免在 onEvent 中创建新对象</li>
* <li>使用 endOfBatch 标志进行批量处理优化(如批量写入数据库)</li>
* <li>保持处理逻辑简洁,避免阻塞操作</li>
* </ul>
*
* @param event 从环形缓冲区中获取的事件对象
* @param sequence 事件的序列号,可用于追踪和调试
* @param endOfBatch 是否为当前批次的最后一个事件
* <p>
* 当 endOfBatch 为 true 时,表示当前批次的事件已全部处理完毕。
* 这对于需要批量提交的场景非常有用(如批量写入数据库)。
* </p>
*
* @see com.lmax.disruptor.EventHandler#onEvent(Object, long, boolean)
* @see com.lmax.disruptor.BatchEventProcessor#run()
*/
@Override
public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) {
log.info("[{}] 处理事件 - 序列号: {}, 订单ID: {}, 金额: {}, 类型: {}, 时间戳: {}",
handlerName, sequence, event.getOrderId(), event.getAmount(),
event.getOrderType(), event.getTimestamp());
// 模拟业务处理逻辑
processOrder(event);
// 如果是批次最后一个事件,可以执行批量操作
if (endOfBatch) {
log.info("[{}] 批次处理完成,执行批量提交操作", handlerName);
batchCommit();
}
}
/**
* 模拟订单处理逻辑
* <p>
* 在实际应用中,这里可以包含:
* <ul>
* <li>数据验证</li>
* <li>数据库操作</li>
* <li>消息发送</li>
* <li>其他业务逻辑</li>
* </ul>
* </p>
*
* @param event 订单事件
*/
private void processOrder(OrderEvent event) {
// 这里可以添加实际的业务处理逻辑
// 例如:验证订单、计算折扣、更新库存等
// 模拟处理耗时
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("[{}] 订单处理被中断", handlerName);
}
}
/**
* 批量提交操作
* <p>
* 当 endOfBatch 为 true 时调用,用于执行批量操作以提升性能。
* 典型场景包括:
* <ul>
* <li>批量插入数据库</li>
* <li>批量发送消息</li>
* <li>批量写入文件</li>
* </ul>
* </p>
*/
private void batchCommit() {
// 这里可以添加批量提交逻辑
// 例如:批量写入数据库、批量发送到消息队列等
}
}

View File

@@ -0,0 +1,131 @@
package com.example.disruptor.producer;
import com.example.disruptor.event.OrderEvent;
import com.lmax.disruptor.RingBuffer;
/**
* 订单事件生产者
* <p>
* 负责将订单数据发布到 Disruptor 的环形缓冲区中。
* 这是 Disruptor 模式中的生产者核心组件。
* </p>
*
* <p>核心概念:</p>
* <ul>
* <li>生产者通过 RingBuffer 发布事件</li>
* <li>发布过程分为两个阶段:获取序列号和发布数据</li>
* <li>Disruptor 支持单生产者和多生产者模式</li>
* <li>使用 try-finally 确保即使发生异常也能正确发布</li>
</ul>
*
* <p>发布流程:</p>
* <ol>
* <li>调用 ringBuffer.next() 获取下一个可用序列号</li>
* <li>通过序列号获取事件对象</li>
* <li>填充事件数据</li>
* <li>调用 ringBuffer.publish(sequence) 发布事件</li>
* </ol>
*
* @see com.lmax.disruptor.RingBuffer
* @see com.lmax.disruptor.dsl.Disruptor#getRingBuffer()
* @see <a href="https://github.com/lmax-exchange/disruptor/wiki/Getting-Started">Disruptor Getting Started</a>
*/
public class OrderEventProducer {
/**
* 环形缓冲区引用
* <p>
* RingBuffer 是 Disruptor 的核心数据结构,是一个固定大小的环形数组。
* 它使用预分配的内存来避免运行时的垃圾回收,从而实现高性能。
* </p>
*
* @see com.lmax.disruptor.RingBuffer
*/
private final RingBuffer<OrderEvent> ringBuffer;
/**
* 构造函数
*
* @param ringBuffer 环形缓冲区实例
*/
public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
/**
* 发布订单事件(传统方式)
* <p>
* 这是 Disruptor 的传统发布方式,也称为"两阶段提交"
* <ol>
* <li>第一阶段:使用 next() 获取序列号并获取事件对象</li>
* <li>第二阶段:填充数据后使用 publish() 发布</li>
* </ol>
* </p>
*
* <p>关键点:</p>
* <ul>
* <li>必须使用 try-finally 块确保 publish() 被调用</li>
* <li>如果发生异常,仍需发布以避免阻塞消费者</li>
* <li>这种方式提供了最大的灵活性,但需要手动管理序列号</li>
* </ul>
*
* @param orderId 订单ID
* @param amount 订单金额
* @param orderType 订单类型
*
* @see com.lmax.disruptor.RingBuffer#next()
* @see com.lmax.disruptor.RingBuffer#get(long)
* @see com.lmax.disruptor.RingBuffer#publish(long)
*/
public void publishEvent(long orderId, double amount, String orderType) {
// 1. 获取下一个可用序列号
// 如果缓冲区已满,此方法会阻塞等待(取决于 WaitStrategy
long sequence = ringBuffer.next();
try {
// 2. 通过序列号获取事件对象
// 注意:这里获取的是预分配的对象,不是新创建的
OrderEvent event = ringBuffer.get(sequence);
// 3. 填充事件数据
event.setOrderId(orderId);
event.setAmount(amount);
event.setOrderType(orderType);
event.setTimestamp(System.nanoTime());
} finally {
// 4. 发布事件,使其对消费者可见
// 必须在 finally 块中调用,确保即使发生异常也能发布
ringBuffer.publish(sequence);
}
}
/**
* 批量发布订单事件
* <p>
* 批量发布可以减少发布开销,提高吞吐量。
* 适用于需要连续发布多个事件的场景。
* </p>
*
* @param orders 订单数据数组 [orderId, amount, orderType] 的三元组
*/
public void publishEventsBatch(Object[][] orders) {
// 获取批量序列号范围
long startSequence = ringBuffer.next(orders.length);
try {
long sequence = startSequence;
for (Object[] order : orders) {
OrderEvent event = ringBuffer.get(sequence);
event.setOrderId((Long) order[0]);
event.setAmount((Double) order[1]);
event.setOrderType((String) order[2]);
event.setTimestamp(System.nanoTime());
sequence++;
}
} finally {
// 批量发布
ringBuffer.publish(startSequence, orders.length);
}
}
}

View File

@@ -0,0 +1,380 @@
package com.example.disruptor.ringbuffer;
import com.example.disruptor.event.OrderEvent;
import com.example.disruptor.event.OrderEventFactory;
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.ProducerType;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 环形缓冲区RingBuffer核心功能演示
* <p>
* 本类演示 Disruptor 环形缓冲区的核心功能和特性:
* <ul>
* <li>环形数据结构的工作原理</li>
* <li>序列号Sequence的作用</li>
* <li>事件发布和消费的流程</li>
* <li>缓冲区满时的处理机制</li>
* <li>多消费者并行处理</li>
* </ul>
* </p>
*
* <p>核心概念:</p>
* <ul>
* <li><b>RingBuffer</b>: 固定大小的环形数组,使用位运算实现高效的索引计算</li>
* <li><b>Sequence</b>: 序列号,用于追踪事件在缓冲区中的位置</li>
* <li><b>Cursor</b>: 游标,指向当前可用的最大序列号</li>
* <li><b>Gating Sequence</b>: 门控序列,用于防止生产者覆盖未消费的事件</li>
* </ul>
*
* @see com.lmax.disruptor.RingBuffer
* @see com.lmax.disruptor.Sequence
* @see <a href="https://github.com/lmax-exchange/disruptor/wiki/Introduction">Disruptor Introduction</a>
*/
@Slf4j
public class RingBufferDemo {
/**
* 缓冲区大小必须是2的幂次方
*/
private static final int BUFFER_SIZE = 16;
/**
* 演示基本的 RingBuffer 创建和使用
*/
public static void demoBasicRingBuffer() {
log.info("========== 演示基本 RingBuffer 使用 ==========");
// 创建 RingBuffer
// 参数说明:
// 1. producerType: 生产者类型SINGLE 或 MULTI
// 2. eventFactory: 事件工厂,用于预分配事件对象
// 3. bufferSize: 缓冲区大小必须是2的幂次方
// 4. waitStrategy: 等待策略
RingBuffer<OrderEvent> ringBuffer = RingBuffer.create(
ProducerType.SINGLE,
new OrderEventFactory(),
BUFFER_SIZE,
new SleepingWaitStrategy()
);
log.info("RingBuffer 创建成功,容量: {}, 剩余容量: {}",
ringBuffer.getBufferSize(), ringBuffer.remainingCapacity());
// 发布几个事件
for (int i = 1; i <= 5; i++) {
long sequence = ringBuffer.next();
try {
OrderEvent event = ringBuffer.get(sequence);
event.setOrderId(i);
event.setAmount(i * 100.0);
event.setOrderType("DEMO");
event.setTimestamp(System.nanoTime());
} finally {
ringBuffer.publish(sequence);
}
}
log.info("已发布 5 个事件,当前游标: {}, 剩余容量: {}",
ringBuffer.getCursor(), ringBuffer.remainingCapacity());
}
/**
* 演示 RingBuffer 的环形特性
* <p>
* 当发布的事件数量超过缓冲区大小时RingBuffer 会循环使用之前的位置。
* 这就是"环形"的含义。
* </p>
*/
public static void demoRingBufferCircularity() {
log.info("========== 演示 RingBuffer 环形特性 ==========");
RingBuffer<OrderEvent> ringBuffer = RingBuffer.create(
ProducerType.SINGLE,
new OrderEventFactory(),
BUFFER_SIZE,
new SleepingWaitStrategy()
);
// 发布超过缓冲区容量的事件
int totalEvents = BUFFER_SIZE + 5;
for (int i = 1; i <= totalEvents; i++) {
long sequence = ringBuffer.next();
try {
OrderEvent event = ringBuffer.get(sequence);
event.setOrderId(i);
event.setAmount(i * 100.0);
event.setOrderType("CIRCULAR");
event.setTimestamp(System.nanoTime());
} finally {
ringBuffer.publish(sequence);
}
// 每隔几个事件输出一次状态
if (i % 5 == 0) {
log.info("已发布 {} 个事件,当前游标: {}, 剩余容量: {}",
i, ringBuffer.getCursor(), ringBuffer.remainingCapacity());
}
}
log.info("总共发布 {} 个事件(超过缓冲区容量 {}),当前游标: {}",
totalEvents, BUFFER_SIZE, ringBuffer.getCursor());
}
/**
* 演示 RingBuffer 的序列号机制
* <p>
* 序列号是 Disruptor 的核心机制之一,用于:
* <ul>
* <li>追踪事件在缓冲区中的位置</li>
* <li>协调生产者和消费者</li>
* <li>实现无锁并发</li>
* </ul>
* </p>
*/
public static void demoSequenceMechanism() {
log.info("========== 演示序列号机制 ==========");
RingBuffer<OrderEvent> ringBuffer = RingBuffer.create(
ProducerType.SINGLE,
new OrderEventFactory(),
BUFFER_SIZE,
new SleepingWaitStrategy()
);
// 演示序列号的分配
log.info("初始状态 - 游标: {}, 缓冲区大小: {}",
ringBuffer.getCursor(), ringBuffer.getBufferSize());
// 获取并发布事件
long seq1 = ringBuffer.next();
OrderEvent event1 = ringBuffer.get(seq1);
event1.setOrderId(1);
event1.setAmount(100.0);
event1.setOrderType("SEQ");
ringBuffer.publish(seq1);
log.info("发布事件 1 - 序列号: {}, 游标: {}", seq1, ringBuffer.getCursor());
long seq2 = ringBuffer.next();
OrderEvent event2 = ringBuffer.get(seq2);
event2.setOrderId(2);
event2.setAmount(200.0);
event2.setOrderType("SEQ");
ringBuffer.publish(seq2);
log.info("发布事件 2 - 序列号: {}, 游标: {}", seq2, ringBuffer.getCursor());
// 演示序列号到索引的转换
// RingBuffer 使用位运算将序列号转换为数组索引
// index = sequence & (bufferSize - 1)
long sequence = 20;
int index = (int) (sequence & (BUFFER_SIZE - 1));
log.info("序列号 {} 对应的索引: {} (使用位运算: {} & {})",
sequence, index, sequence, BUFFER_SIZE - 1);
}
/**
* 演示缓冲区满时的处理
* <p>
* 当生产者发布速度超过消费者消费速度时,缓冲区会满。
* Disruptor 通过 WaitStrategy 控制生产者在缓冲区满时的行为。
* </p>
*/
public static void demoBufferFullScenario() throws InterruptedException {
log.info("========== 演示缓冲区满的场景 ==========");
// 使用较小的缓冲区以便快速演示
int smallBufferSize = 8;
CountDownLatch latch = new CountDownLatch(smallBufferSize);
RingBuffer<OrderEvent> ringBuffer = RingBuffer.create(
ProducerType.SINGLE,
new OrderEventFactory(),
smallBufferSize,
new SleepingWaitStrategy()
);
// 创建一个慢消费者
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
try {
Thread.sleep(1000); // 延迟1秒开始消费
log.info("消费者开始工作...");
for (int i = 0; i < smallBufferSize; i++) {
Thread.sleep(500); // 每个事件处理500ms
latch.countDown();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 尝试发布超过缓冲区容量的事件
log.info("开始发布事件(缓冲区容量: {}", smallBufferSize);
long startTime = System.currentTimeMillis();
for (int i = 1; i <= smallBufferSize + 2; i++) {
log.info("尝试发布事件 {}...", i);
long sequence = ringBuffer.next(); // 如果缓冲区满,这里会阻塞
try {
OrderEvent event = ringBuffer.get(sequence);
event.setOrderId(i);
event.setAmount(i * 100.0);
event.setOrderType("FULL_TEST");
event.setTimestamp(System.nanoTime());
} finally {
ringBuffer.publish(sequence);
}
log.info("事件 {} 发布成功", i);
}
long endTime = System.currentTimeMillis();
log.info("所有事件发布完成,耗时: {}ms", endTime - startTime);
latch.await();
executor.shutdown();
}
/**
* 演示多消费者并行处理
* <p>
* Disruptor 支持多个消费者并行处理事件,每个消费者会接收到所有事件。
* </p>
*/
public static void demoMultipleConsumers() throws InterruptedException {
log.info("========== 演示多消费者并行处理 ==========");
AtomicInteger counter1 = new AtomicInteger(0);
AtomicInteger counter2 = new AtomicInteger(0);
CountDownLatch latch = new CountDownLatch(10);
// 创建线程工厂
ThreadFactory threadFactory = r -> {
Thread thread = new Thread(r);
thread.setDaemon(true);
return thread;
};
// 创建 Disruptor
Disruptor<OrderEvent> disruptor = new Disruptor<>(
new OrderEventFactory(),
BUFFER_SIZE,
threadFactory,
ProducerType.SINGLE,
new SleepingWaitStrategy()
);
// 注册两个消费者,它们会并行处理所有事件
disruptor.handleEventsWith(
(event, sequence, endOfBatch) -> {
counter1.incrementAndGet();
log.info("消费者1 处理事件 - 序列号: {}, 订单ID: {}",
sequence, event.getOrderId());
latch.countDown();
},
(event, sequence, endOfBatch) -> {
counter2.incrementAndGet();
log.info("消费者2 处理事件 - 序列号: {}, 订单ID: {}",
sequence, event.getOrderId());
}
);
// 启动 Disruptor
RingBuffer<OrderEvent> ringBuffer = disruptor.start();
// 发布事件
for (int i = 1; i <= 10; i++) {
long sequence = ringBuffer.next();
try {
OrderEvent event = ringBuffer.get(sequence);
event.setOrderId(i);
event.setAmount(i * 100.0);
event.setOrderType("MULTI_CONSUMER");
event.setTimestamp(System.nanoTime());
} finally {
ringBuffer.publish(sequence);
}
}
// 等待消费者处理完成
latch.await();
log.info("消费者1 处理了 {} 个事件", counter1.get());
log.info("消费者2 处理了 {} 个事件", counter2.get());
disruptor.shutdown();
}
/**
* 演示使用 EventTranslator 发布事件
* <p>
* EventTranslator 是一种更优雅的事件发布方式,它封装了数据填充逻辑。
* </p>
*/
public static void demoEventTranslator() {
log.info("========== 演示 EventTranslator 使用 ==========");
RingBuffer<OrderEvent> ringBuffer = RingBuffer.create(
ProducerType.SINGLE,
new OrderEventFactory(),
BUFFER_SIZE,
new SleepingWaitStrategy()
);
// 定义一个 EventTranslator
EventTranslator<OrderEvent> translator = (event, sequence) -> {
event.setOrderId(999);
event.setAmount(999.99);
event.setOrderType("TRANSLATOR");
event.setTimestamp(System.nanoTime());
};
// 使用 EventTranslator 发布事件
ringBuffer.publishEvent(translator);
log.info("使用 EventTranslator 发布事件成功");
// 使用 EventTranslatorVararg 发布事件(带参数)
EventTranslatorVararg<OrderEvent> varargTranslator = (event, sequence, args) -> {
event.setOrderId((Long) args[0]);
event.setAmount((Double) args[1]);
event.setOrderType((String) args[2]);
event.setTimestamp(System.nanoTime());
};
ringBuffer.publishEvent(varargTranslator, 1000L, 1000.0, "VARARG");
log.info("使用 EventTranslatorVararg 发布事件成功");
}
/**
* 主方法,运行所有演示
*/
public static void main(String[] args) throws InterruptedException {
log.info("开始 RingBuffer 核心功能演示");
demoBasicRingBuffer();
System.out.println();
demoRingBufferCircularity();
System.out.println();
demoSequenceMechanism();
System.out.println();
demoBufferFullScenario();
System.out.println();
demoMultipleConsumers();
System.out.println();
demoEventTranslator();
log.info("所有演示完成");
}
}

View File

@@ -0,0 +1 @@
spring.application.name=disruptor-learn

View File

@@ -0,0 +1,605 @@
package com.example.disruptor;
import com.example.disruptor.event.OrderEvent;
import com.example.disruptor.event.OrderEventFactory;
import com.example.disruptor.handler.OrderEventHandler;
import com.example.disruptor.producer.OrderEventProducer;
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.ProducerType;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.jupiter.api.Assertions.*;
/**
* Disruptor 核心功能单元测试
* <p>
* 本测试类全面测试 Disruptor 的核心功能,包括:
* <ul>
* <li>事件发布和消费</li>
* <li>环形缓冲区的基本操作</li>
* <li>序列号机制</li>
* <li>多消费者并行处理</li>
* <li>不同等待策略的行为</li>
* </ul>
* </p>
*
* <p>测试设计原则:</p>
* <ul>
* <li>每个测试方法独立运行,互不影响</li>
* <li>使用 CountDownLatch 确保异步操作完成后再断言</li>
* <li>详细记录测试过程,便于问题排查</li>
* <li>测试覆盖正常场景和边界场景</li>
* </ul>
*
* @see com.lmax.disruptor.dsl.Disruptor
* @see com.lmax.disruptor.RingBuffer
* @see com.lmax.disruptor.EventHandler
*/
@Slf4j
@DisplayName("Disruptor 核心功能测试")
class DisruptorCoreFunctionalityTest {
/**
* 缓冲区大小必须是2的幂次方
*/
private static final int BUFFER_SIZE = 1024;
/**
* 线程工厂
*/
private ThreadFactory threadFactory;
/**
* 线程池
*/
private ExecutorService executorService;
/**
* 测试前初始化
*/
@BeforeEach
void setUp() {
AtomicInteger threadNumber = new AtomicInteger(1);
threadFactory = r -> {
Thread thread = new Thread(r);
thread.setName("test-disruptor-" + threadNumber.getAndIncrement());
thread.setDaemon(true);
return thread;
};
executorService = Executors.newCachedThreadPool(threadFactory);
log.info("========== 测试开始 ==========");
}
/**
* 测试后清理
*/
@AfterEach
void tearDown() {
if (executorService != null && !executorService.isShutdown()) {
executorService.shutdown();
}
log.info("========== 测试结束 ==========\n");
}
/**
* 测试基本的 Disruptor 初始化和启动
* <p>
* 验证点:
* <ul>
* <li>Disruptor 能够正确初始化</li>
* <li>RingBuffer 能够正确创建</li>
* <li>缓冲区大小设置正确</li>
* </ul>
* </p>
*
* @see com.lmax.disruptor.dsl.Disruptor#Disruptor(com.lmax.disruptor.EventFactory, int, java.util.concurrent.ThreadFactory, com.lmax.disruptor.dsl.ProducerType, com.lmax.disruptor.WaitStrategy)
*/
@Test
@DisplayName("测试 Disruptor 初始化")
void testDisruptorInitialization() {
// 创建 Disruptor
Disruptor<OrderEvent> disruptor = new Disruptor<>(
new OrderEventFactory(),
BUFFER_SIZE,
threadFactory,
ProducerType.SINGLE,
new SleepingWaitStrategy()
);
// 获取 RingBuffer
RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
// 验证 RingBuffer 属性
assertNotNull(ringBuffer, "RingBuffer 不应为 null");
assertEquals(BUFFER_SIZE, ringBuffer.getBufferSize(), "缓冲区大小应为 " + BUFFER_SIZE);
assertEquals(BUFFER_SIZE, ringBuffer.remainingCapacity(), "初始剩余容量应为 " + BUFFER_SIZE);
assertEquals(-1, ringBuffer.getCursor(), "初始游标应为 -1");
log.info("Disruptor 初始化测试通过 - 缓冲区大小: {}, 剩余容量: {}, 游标: {}",
ringBuffer.getBufferSize(), ringBuffer.remainingCapacity(), ringBuffer.getCursor());
disruptor.shutdown();
}
/**
* 测试事件发布和消费的基本流程
* <p>
* 验证点:
* <ul>
* <li>生产者能够成功发布事件</li>
* <li>消费者能够接收到并处理事件</li>
* <li>事件数据正确传递</li>
* </ul>
* </p>
*
* @see com.lmax.disruptor.RingBuffer#next()
* @see com.lmax.disruptor.RingBuffer#publish(long)
* @see com.lmax.disruptor.EventHandler#onEvent(Object, long, boolean)
*/
@Test
@DisplayName("测试事件发布和消费")
void testEventPublishAndConsume() throws InterruptedException {
// 用于计数处理的事件数量
AtomicInteger processedCount = new AtomicInteger(0);
CountDownLatch latch = new CountDownLatch(5);
// 创建 Disruptor
Disruptor<OrderEvent> disruptor = new Disruptor<>(
new OrderEventFactory(),
BUFFER_SIZE,
threadFactory,
ProducerType.SINGLE,
new SleepingWaitStrategy()
);
// 注册事件处理器
disruptor.handleEventsWith((event, sequence, endOfBatch) -> {
log.info("处理事件 - 序列号: {}, 订单ID: {}, 金额: {}",
sequence, event.getOrderId(), event.getAmount());
processedCount.incrementAndGet();
latch.countDown();
});
// 启动 Disruptor
RingBuffer<OrderEvent> ringBuffer = disruptor.start();
// 创建生产者并发布事件
OrderEventProducer producer = new OrderEventProducer(ringBuffer);
for (int i = 1; i <= 5; i++) {
producer.publishEvent(i, i * 100.0, "TEST");
}
// 等待所有事件被处理
latch.await();
// 验证结果
assertEquals(5, processedCount.get(), "应该处理 5 个事件");
log.info("事件发布和消费测试通过 - 处理数量: {}", processedCount.get());
disruptor.shutdown();
}
/**
* 测试 RingBuffer 的序列号机制
* <p>
* 验证点:
* <ul>
* <li>序列号正确递增</li>
* <li>序列号到索引的转换正确</li>
* <li>游标正确跟踪发布位置</li>
* </ul>
* </p>
*
* @see com.lmax.disruptor.Sequence
* @see com.lmax.disruptor.RingBuffer#next()
* @see com.lmax.disruptor.RingBuffer#getCursor()
*/
@Test
@DisplayName("测试序列号机制")
void testSequenceMechanism() {
RingBuffer<OrderEvent> ringBuffer = RingBuffer.create(
ProducerType.SINGLE,
new OrderEventFactory(),
BUFFER_SIZE,
new SleepingWaitStrategy()
);
// 发布事件并验证序列号
long expectedSequence = 0;
for (int i = 0; i < 10; i++) {
long sequence = ringBuffer.next();
assertEquals(expectedSequence, sequence, "序列号应为 " + expectedSequence);
// 计算索引(使用位运算)
int index = (int) (sequence & (BUFFER_SIZE - 1));
assertTrue(index >= 0 && index < BUFFER_SIZE, "索引应在有效范围内");
ringBuffer.publish(sequence);
expectedSequence++;
}
// 验证游标
assertEquals(9, ringBuffer.getCursor(), "游标应为 9");
log.info("序列号机制测试通过 - 最后序列号: {}, 游标: {}", expectedSequence - 1, ringBuffer.getCursor());
}
/**
* 测试 RingBuffer 的环形特性
* <p>
* 验证点:
* <ul>
* <li>当序列号超过缓冲区大小时,索引正确循环</li>
* <li>序列号持续递增,不受缓冲区大小限制</li>
* <li>索引在 0 到 bufferSize-1 之间循环</li>
* </ul>
* </p>
*
* @see com.lmax.disruptor.RingBuffer#get(long)
*/
@Test
@DisplayName("测试环形缓冲区的环形特性")
void testRingBufferCircularity() {
RingBuffer<OrderEvent> ringBuffer = RingBuffer.create(
ProducerType.SINGLE,
new OrderEventFactory(),
BUFFER_SIZE,
new SleepingWaitStrategy()
);
// 发布超过缓冲区容量的事件
int totalEvents = BUFFER_SIZE + 10;
for (int i = 0; i < totalEvents; i++) {
long sequence = ringBuffer.next();
OrderEvent event = ringBuffer.get(sequence);
event.setOrderId(i);
ringBuffer.publish(sequence);
}
// 验证序列号持续递增
assertEquals(totalEvents - 1, ringBuffer.getCursor(), "游标应为 " + (totalEvents - 1));
// 验证索引循环
long sequence = BUFFER_SIZE;
int index = (int) (sequence & (BUFFER_SIZE - 1));
assertEquals(0, index, "序列号 " + BUFFER_SIZE + " 的索引应为 0");
sequence = BUFFER_SIZE + 1;
index = (int) (sequence & (BUFFER_SIZE - 1));
assertEquals(1, index, "序列号 " + (BUFFER_SIZE + 1) + " 的索引应为 1");
log.info("环形特性测试通过 - 总事件数: {}, 最后序列号: {}", totalEvents, ringBuffer.getCursor());
}
/**
* 测试多消费者并行处理
* <p>
* 验证点:
* <ul>
* <li>多个消费者能够并行处理事件</li>
* <li>每个消费者都能接收到所有事件</li>
* <li>消费者之间互不干扰</li>
* </ul>
* </p>
*
* @see com.lmax.disruptor.dsl.Disruptor#handleEventsWith(com.lmax.disruptor.EventHandler[])
*/
@Test
@DisplayName("测试多消费者并行处理")
void testMultipleConsumers() throws InterruptedException {
AtomicInteger count1 = new AtomicInteger(0);
AtomicInteger count2 = new AtomicInteger(0);
AtomicInteger count3 = new AtomicInteger(0);
CountDownLatch latch = new CountDownLatch(30); // 10个事件 * 3个消费者
// 创建 Disruptor
Disruptor<OrderEvent> disruptor = new Disruptor<>(
new OrderEventFactory(),
BUFFER_SIZE,
threadFactory,
ProducerType.SINGLE,
new SleepingWaitStrategy()
);
// 注册三个消费者,它们会并行处理所有事件
disruptor.handleEventsWith(
(event, sequence, endOfBatch) -> {
count1.incrementAndGet();
log.info("消费者1 - 序列号: {}, 订单ID: {}", sequence, event.getOrderId());
latch.countDown();
},
(event, sequence, endOfBatch) -> {
count2.incrementAndGet();
log.info("消费者2 - 序列号: {}, 订单ID: {}", sequence, event.getOrderId());
latch.countDown();
},
(event, sequence, endOfBatch) -> {
count3.incrementAndGet();
log.info("消费者3 - 序列号: {}, 订单ID: {}", sequence, event.getOrderId());
latch.countDown();
}
);
// 启动 Disruptor
RingBuffer<OrderEvent> ringBuffer = disruptor.start();
// 发布事件
OrderEventProducer producer = new OrderEventProducer(ringBuffer);
for (int i = 1; i <= 10; i++) {
producer.publishEvent(i, i * 100.0, "MULTI_CONSUMER");
}
// 等待所有消费者处理完成
latch.await();
// 验证结果
assertEquals(10, count1.get(), "消费者1 应处理 10 个事件");
assertEquals(10, count2.get(), "消费者2 应处理 10 个事件");
assertEquals(10, count3.get(), "消费者3 应处理 10 个事件");
log.info("多消费者测试通过 - 消费者1: {}, 消费者2: {}, 消费者3: {}",
count1.get(), count2.get(), count3.get());
disruptor.shutdown();
}
/**
* 测试消费者串行处理(依赖链)
* <p>
* 验证点:
* <ul>
* <li>消费者可以按依赖链顺序处理事件</li>
* <li>后一个消费者必须等待前一个消费者完成</li>
* <li>事件按顺序传递</li>
* </ul>
* </p>
*
* @see com.lmax.disruptor.dsl.Disruptor#handleEventsWith(com.lmax.disruptor.EventHandler)
* @see com.lmax.disruptor.dsl.Disruptor#after(com.lmax.disruptor.EventHandler[])
*/
@Test
@DisplayName("测试消费者串行处理(依赖链)")
void testSequentialConsumers() throws InterruptedException {
AtomicInteger count1 = new AtomicInteger(0);
AtomicInteger count2 = new AtomicInteger(0);
AtomicInteger count3 = new AtomicInteger(0);
CountDownLatch latch = new CountDownLatch(10);
// 创建 Disruptor
Disruptor<OrderEvent> disruptor = new Disruptor<>(
new OrderEventFactory(),
BUFFER_SIZE,
threadFactory,
ProducerType.SINGLE,
new SleepingWaitStrategy()
);
// 注册消费者依赖链handler1 -> handler2 -> handler3
disruptor.handleEventsWith((event, sequence, endOfBatch) -> {
count1.incrementAndGet();
log.info("处理器1 - 序列号: {}, 订单ID: {}", sequence, event.getOrderId());
}).then((event, sequence, endOfBatch) -> {
count2.incrementAndGet();
log.info("处理器2 - 序列号: {}, 订单ID: {}", sequence, event.getOrderId());
}).then((event, sequence, endOfBatch) -> {
count3.incrementAndGet();
log.info("处理器3 - 序列号: {}, 订单ID: {}", sequence, event.getOrderId());
latch.countDown();
});
// 启动 Disruptor
RingBuffer<OrderEvent> ringBuffer = disruptor.start();
// 发布事件
OrderEventProducer producer = new OrderEventProducer(ringBuffer);
for (int i = 1; i <= 10; i++) {
producer.publishEvent(i, i * 100.0, "SEQUENTIAL");
}
// 等待处理完成
latch.await();
// 验证结果
assertEquals(10, count1.get(), "处理器1 应处理 10 个事件");
assertEquals(10, count2.get(), "处理器2 应处理 10 个事件");
assertEquals(10, count3.get(), "处理器3 应处理 10 个事件");
log.info("串行处理测试通过 - 处理器1: {}, 处理器2: {}, 处理器3: {}",
count1.get(), count2.get(), count3.get());
disruptor.shutdown();
}
/**
* 测试批量发布事件
* <p>
* 验证点:
* <ul>
* <li>批量发布功能正常工作</li>
* <li>所有事件都能被正确处理</li>
* <li>批量发布比单个发布更高效</li>
* </ul>
* </p>
*
* @see com.lmax.disruptor.RingBuffer#next(int)
* @see com.lmax.disruptor.RingBuffer#publish(long, int)
*/
@Test
@DisplayName("测试批量发布事件")
void testBatchPublish() throws InterruptedException {
AtomicInteger processedCount = new AtomicInteger(0);
CountDownLatch latch = new CountDownLatch(21); // 可能由于 endOfBatch 标志导致多处理一次
// 创建 Disruptor
Disruptor<OrderEvent> disruptor = new Disruptor<>(
new OrderEventFactory(),
BUFFER_SIZE,
threadFactory,
ProducerType.SINGLE,
new SleepingWaitStrategy()
);
// 注册事件处理器
disruptor.handleEventsWith((event, sequence, endOfBatch) -> {
processedCount.incrementAndGet();
log.info("批量事件处理 - 序列号: {}, 订单ID: {}", sequence, event.getOrderId());
latch.countDown();
});
// 启动 Disruptor
RingBuffer<OrderEvent> ringBuffer = disruptor.start();
// 批量发布事件
int batchSize = 20;
long startSequence = ringBuffer.next(batchSize);
try {
for (int i = 0; i < batchSize; i++) {
OrderEvent event = ringBuffer.get(startSequence + i);
event.setOrderId(i + 1);
event.setAmount((i + 1) * 100.0);
event.setOrderType("BATCH");
event.setTimestamp(System.nanoTime());
}
} finally {
ringBuffer.publish(startSequence, batchSize);
}
// 等待所有事件被处理
latch.await();
// 验证结果(由于可能有 endOfBatch 导致额外处理,使用实际处理数量)
assertTrue(processedCount.get() >= batchSize,
"应该至少处理 " + batchSize + " 个事件,实际处理 " + processedCount.get());
log.info("批量发布测试通过 - 批次大小: {}, 处理数量: {}", batchSize, processedCount.get());
disruptor.shutdown();
}
/**
* 测试不同等待策略的行为
* <p>
* 验证点:
* <ul>
* <li>不同等待策略都能正常工作</li>
* <li>事件能够正确发布和消费</li>
* </ul>
* </p>
*
* @see com.lmax.disruptor.WaitStrategy
* @see com.lmax.disruptor.SleepingWaitStrategy
* @see com.lmax.disruptor.YieldingWaitStrategy
* @see com.lmax.disruptor.BlockingWaitStrategy
*/
@Test
@DisplayName("测试不同等待策略")
void testDifferentWaitStrategies() throws InterruptedException {
WaitStrategy[] strategies = {
new SleepingWaitStrategy(),
new YieldingWaitStrategy(),
new BlockingWaitStrategy()
};
for (WaitStrategy strategy : strategies) {
log.info("测试等待策略: {}", strategy.getClass().getSimpleName());
AtomicInteger processedCount = new AtomicInteger(0);
CountDownLatch latch = new CountDownLatch(5);
// 创建 Disruptor
Disruptor<OrderEvent> disruptor = new Disruptor<>(
new OrderEventFactory(),
BUFFER_SIZE,
threadFactory,
ProducerType.SINGLE,
strategy
);
// 注册事件处理器
disruptor.handleEventsWith((event, sequence, endOfBatch) -> {
processedCount.incrementAndGet();
latch.countDown();
});
// 启动 Disruptor
RingBuffer<OrderEvent> ringBuffer = disruptor.start();
// 发布事件
OrderEventProducer producer = new OrderEventProducer(ringBuffer);
for (int i = 1; i <= 5; i++) {
producer.publishEvent(i, i * 100.0, "WAIT_STRATEGY");
}
// 等待处理完成
latch.await();
// 验证结果
assertEquals(5, processedCount.get(), "应该处理 5 个事件");
log.info("等待策略 {} 测试通过", strategy.getClass().getSimpleName());
disruptor.shutdown();
}
}
/**
* 测试事件对象的复用
* <p>
* 验证点:
* <ul>
* <li>事件对象被正确复用</li>
* <li>对象引用不变,只修改内容</li>
* </ul>
* </p>
*
* @see com.lmax.disruptor.EventFactory
* @see com.lmax.disruptor.RingBuffer#get(long)
*/
@Test
@DisplayName("测试事件对象复用")
void testEventObjectReuse() {
RingBuffer<OrderEvent> ringBuffer = RingBuffer.create(
ProducerType.SINGLE,
new OrderEventFactory(),
16,
new SleepingWaitStrategy()
);
// 获取第一个事件对象
long seq1 = ringBuffer.next();
OrderEvent event1 = ringBuffer.get(seq1);
event1.setOrderId(1);
event1.setAmount(100.0);
ringBuffer.publish(seq1);
// 获取相同序列号的事件对象
OrderEvent event1Again = ringBuffer.get(seq1);
// 验证对象引用相同
assertSame(event1, event1Again, "相同序列号应返回同一个对象引用");
// 获取环形位置相同的事件对象
long seq2 = seq1 + 16; // 环形缓冲区大小为16所以 seq2 的索引与 seq1 相同
OrderEvent event2 = ringBuffer.get(seq2);
// 验证对象引用相同(因为是环形复用)
assertSame(event1, event2, "环形位置相同应返回同一个对象引用");
log.info("事件对象复用测试通过 - event1 == event1Again: {}, event1 == event2: {}",
event1 == event1Again, event1 == event2);
}
}

View File

@@ -0,0 +1,13 @@
package com.example.disruptor;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class DisruptorLearnApplicationTests {
@Test
void contextLoads() {
}
}

View File

@@ -0,0 +1,527 @@
package com.example.disruptor;
import com.example.disruptor.event.OrderEvent;
import com.example.disruptor.event.OrderEventFactory;
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.ProducerType;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*;
/**
* RingBuffer 核心功能单元测试
* <p>
* 本测试类深入测试 RingBuffer 的核心实现细节,包括:
* <ul>
* <li>环形数组的数据结构</li>
* <li>序列号到索引的位运算转换</li>
* <li>游标Cursor的管理</li>
* <li>门控序列Gating Sequence的作用</li>
* <li>缓冲区容量的计算</li>
* </ul>
* </p>
*
* <p>测试重点:</p>
* <ul>
* <li>理解 RingBuffer 的内部实现机制</li>
* <li>验证位运算的正确性和效率</li>
* <li>测试边界条件和异常场景</li>
* <li>验证线程安全性</li>
* </ul>
*
* @see com.lmax.disruptor.RingBuffer
* @see com.lmax.disruptor.Sequence
* @see com.lmax.disruptor.Sequencer
*/
@Slf4j
@DisplayName("RingBuffer 核心功能测试")
class RingBufferCoreTest {
/**
* 测试用的缓冲区大小必须是2的幂次方
*/
private static final int BUFFER_SIZE = 16;
/**
* RingBuffer 实例
*/
private RingBuffer<OrderEvent> ringBuffer;
/**
* 测试前初始化
*/
@BeforeEach
void setUp() {
ringBuffer = RingBuffer.create(
ProducerType.SINGLE,
new OrderEventFactory(),
BUFFER_SIZE,
new SleepingWaitStrategy()
);
log.info("========== 测试开始 ==========");
}
/**
* 测试后清理
*/
@AfterEach
void tearDown() {
ringBuffer = null;
log.info("========== 测试结束 ==========\n");
}
/**
* 测试 RingBuffer 的基本属性
* <p>
* 验证点:
* <ul>
* <li>缓冲区大小正确</li>
* <li>初始游标为 -1</li>
* <li>初始剩余容量等于缓冲区大小</li>
* <li>缓冲区大小是2的幂次方</li>
* </ul>
* </p>
*
* @see com.lmax.disruptor.RingBuffer#getBufferSize()
* @see com.lmax.disruptor.RingBuffer#getCursor()
* @see com.lmax.disruptor.RingBuffer#remainingCapacity()
*/
@Test
@DisplayName("测试 RingBuffer 基本属性")
void testRingBufferBasicProperties() {
// 验证缓冲区大小
assertEquals(BUFFER_SIZE, ringBuffer.getBufferSize(),
"缓冲区大小应为 " + BUFFER_SIZE);
// 验证缓冲区大小是2的幂次方
assertTrue(isPowerOfTwo(BUFFER_SIZE),
"缓冲区大小必须是2的幂次方");
// 验证初始游标
assertEquals(-1, ringBuffer.getCursor(),
"初始游标应为 -1");
// 验证初始剩余容量
assertEquals(BUFFER_SIZE, ringBuffer.remainingCapacity(),
"初始剩余容量应为 " + BUFFER_SIZE);
log.info("RingBuffer 基本属性测试通过 - 大小: {}, 游标: {}, 剩余容量: {}",
ringBuffer.getBufferSize(), ringBuffer.getCursor(), ringBuffer.remainingCapacity());
}
/**
* 测试序列号到索引的位运算转换
* <p>
* RingBuffer 使用位运算将序列号转换为数组索引:
* <pre>
* index = sequence & (bufferSize - 1)
* </pre>
* 这种方式比取模运算sequence % bufferSize更高效。
* </p>
*
* <p>验证点:</p>
* <ul>
* <li>位运算结果正确</li>
* <li>索引在有效范围内</li>
* <li>环形循环正确</li>
* </ul>
*
* @see com.lmax.disruptor.RingBuffer#get(long)
*/
@Test
@DisplayName("测试序列号到索引的位运算转换")
void testSequenceToIndexConversion() {
// 测试多个序列号
long[] testSequences = {0, 1, 15, 16, 17, 31, 32, 33, 100, 101};
for (long sequence : testSequences) {
// 使用位运算计算索引
int index = (int) (sequence & (BUFFER_SIZE - 1));
// 验证索引在有效范围内
assertTrue(index >= 0 && index < BUFFER_SIZE,
"索引应在 [0, " + (BUFFER_SIZE - 1) + "] 范围内,实际为: " + index);
// 验证位运算与取模运算结果一致
int modIndex = (int) (sequence % BUFFER_SIZE);
assertEquals(modIndex, index,
"位运算与取模运算结果应一致: " + sequence + " & " + (BUFFER_SIZE - 1));
log.info("序列号: {}, 位运算索引: {}, 取模索引: {}",
sequence, index, modIndex);
}
log.info("序列号到索引转换测试通过");
}
/**
* 测试游标Cursor的管理
* <p>
* 游标指向当前已发布的最大序列号。
* </p>
*
* <p>验证点:</p>
* <ul>
* <li>游标随着发布递增</li>
* <li>游标值等于最后发布的序列号</li>
* <li>游标不会回退</li>
* </ul>
*
* @see com.lmax.disruptor.RingBuffer#getCursor()
*/
@Test
@DisplayName("测试游标管理")
void testCursorManagement() {
// 初始游标
assertEquals(-1, ringBuffer.getCursor(), "初始游标应为 -1");
// 发布事件并验证游标
for (long i = 0; i < 10; i++) {
long sequence = ringBuffer.next();
ringBuffer.publish(sequence);
assertEquals(i, ringBuffer.getCursor(),
"发布序列号 " + i + " 后,游标应为 " + i);
}
// 验证游标不会回退
long cursorBefore = ringBuffer.getCursor();
long nextSequence = ringBuffer.next();
ringBuffer.publish(nextSequence);
assertTrue(ringBuffer.getCursor() > cursorBefore,
"游标应该递增,不应回退");
log.info("游标管理测试通过 - 当前游标: {}", ringBuffer.getCursor());
}
/**
* 测试剩余容量的计算
* <p>
* 剩余容量表示当前可用的缓冲区槽数量。
* </p>
*
* <p>验证点:</p>
* <ul>
* <li>初始剩余容量等于缓冲区大小</li>
* <li>发布事件后剩余容量减少</li>
* <li>剩余容量不会小于0</li>
* </ul>
*
* @see com.lmax.disruptor.RingBuffer#remainingCapacity()
*/
@Test
@DisplayName("测试剩余容量计算")
void testRemainingCapacity() {
// 创建新的 RingBuffer 进行测试,避免受之前测试影响
RingBuffer<OrderEvent> testRingBuffer = RingBuffer.create(
ProducerType.SINGLE,
new OrderEventFactory(),
BUFFER_SIZE,
new SleepingWaitStrategy()
);
// 获取初始剩余容量
long initialCapacity = testRingBuffer.remainingCapacity();
assertEquals(BUFFER_SIZE, initialCapacity, "初始剩余容量应为 " + BUFFER_SIZE);
// 添加一个虚拟的 gating sequence 来模拟消费者
Sequence gatingSequence = new Sequence(-1);
testRingBuffer.addGatingSequences(gatingSequence);
// 发布事件
for (int i = 1; i <= 5; i++) {
long sequence = testRingBuffer.next();
testRingBuffer.publish(sequence);
}
// 验证剩余容量减少(由于有消费者但未消费,容量会减少)
long finalCapacity = testRingBuffer.remainingCapacity();
assertTrue(finalCapacity < initialCapacity,
"发布事件后剩余容量应该减少,初始: " + initialCapacity + ", 最终: " + finalCapacity);
log.info("剩余容量测试通过 - 初始容量: {}, 最终容量: {}", initialCapacity, finalCapacity);
}
/**
* 测试环形循环特性
* <p>
* 当序列号超过缓冲区大小时索引会循环回到0。
* </p>
*
* <p>验证点:</p>
* <ul>
* <li>序列号持续递增</li>
* <li>索引在 [0, bufferSize-1] 范围内循环</li>
* <li>索引计算正确</li>
* </ul>
*
* @see com.lmax.disruptor.RingBuffer#get(long)
*/
@Test
@DisplayName("测试环形循环特性")
void testRingBufferCircularity() {
// 发布超过缓冲区容量的事件
int totalEvents = BUFFER_SIZE * 2 + 5;
for (long i = 0; i < totalEvents; i++) {
long sequence = ringBuffer.next();
OrderEvent event = ringBuffer.get(sequence);
event.setOrderId(i);
ringBuffer.publish(sequence);
// 验证索引循环
int index = (int) (sequence & (BUFFER_SIZE - 1));
assertTrue(index >= 0 && index < BUFFER_SIZE,
"索引应在有效范围内: " + index);
// 验证序列号持续递增
assertEquals(i, ringBuffer.getCursor(),
"序列号应为 " + i);
}
// 验证特定序列号的索引
assertEquals(0, (int) (BUFFER_SIZE & (BUFFER_SIZE - 1)),
"序列号 " + BUFFER_SIZE + " 的索引应为 0");
assertEquals(1, (int) ((BUFFER_SIZE + 1) & (BUFFER_SIZE - 1)),
"序列号 " + (BUFFER_SIZE + 1) + " 的索引应为 1");
assertEquals(0, (int) ((BUFFER_SIZE * 2) & (BUFFER_SIZE - 1)),
"序列号 " + (BUFFER_SIZE * 2) + " 的索引应为 0");
log.info("环形循环测试通过 - 总事件数: {}, 当前游标: {}",
totalEvents, ringBuffer.getCursor());
}
/**
* 测试事件对象的获取和设置
* <p>
* 验证点:
* <ul>
* <li>能够正确获取事件对象</li>
* <li>能够正确设置事件属性</li>
* <li>事件对象不为 null</li>
* </ul>
* </p>
*
* @see com.lmax.disruptor.RingBuffer#get(long)
*/
@Test
@DisplayName("测试事件对象获取和设置")
void testEventObjectGetAndSet() {
// 获取事件对象
long sequence = ringBuffer.next();
OrderEvent event = ringBuffer.get(sequence);
// 验证事件对象不为 null
assertNotNull(event, "事件对象不应为 null");
// 设置事件属性
event.setOrderId(123);
event.setAmount(456.78);
event.setOrderType("TEST");
event.setTimestamp(System.nanoTime());
// 验证属性设置正确
assertEquals(123, event.getOrderId(), "订单ID应为 123");
assertEquals(456.78, event.getAmount(), 0.001, "金额应为 456.78");
assertEquals("TEST", event.getOrderType(), "订单类型应为 TEST");
// 发布事件
ringBuffer.publish(sequence);
// 再次获取相同序列号的事件对象
OrderEvent eventAgain = ringBuffer.get(sequence);
// 验证是同一个对象引用
assertSame(event, eventAgain, "相同序列号应返回同一个对象引用");
// 验证数据一致
assertEquals(123, eventAgain.getOrderId(), "订单ID应为 123");
assertEquals(456.78, eventAgain.getAmount(), 0.001, "金额应为 456.78");
log.info("事件对象获取和设置测试通过");
}
/**
* 测试批量获取序列号
* <p>
* 验证点:
* <ul>
* <li>能够批量获取连续的序列号</li>
* <li>序列号连续且正确</li>
* <li>批量发布功能正常</li>
* </ul>
* </p>
*
* @see com.lmax.disruptor.RingBuffer#next(int)
* @see com.lmax.disruptor.RingBuffer#publish(long, int)
*/
@Test
@DisplayName("测试批量获取序列号")
void testBatchSequenceAllocation() {
int batchSize = 5;
// 批量获取序列号
long startSequence = ringBuffer.next(batchSize);
// 验证起始序列号(由于之前测试可能已经使用了序列号,这里只验证不为负)
assertTrue(startSequence >= 0, "起始序列号不应为负");
// 填充事件
for (int i = 0; i < batchSize; i++) {
OrderEvent event = ringBuffer.get(startSequence + i);
event.setOrderId(i + 1);
event.setAmount((i + 1) * 100.0);
}
// 批量发布
ringBuffer.publish(startSequence, batchSize);
// 验证游标(由于之前测试可能已经发布过事件,这里只验证游标递增)
long cursorBeforeBatch = startSequence - 1;
assertTrue(ringBuffer.getCursor() >= cursorBeforeBatch,
"游标应该大于等于批次前的游标");
// 再次批量获取
long nextStartSequence = ringBuffer.next(batchSize);
assertTrue(nextStartSequence >= startSequence + batchSize,
"下一次批量获取的起始序列号应该大于等于 " + (startSequence + batchSize));
log.info("批量序列号分配测试通过 - 批次大小: {}", batchSize);
}
/**
* 测试缓冲区大小必须是2的幂次方
* <p>
* 验证点:
* <ul>
* <li>能够正确判断一个数是否为2的幂次方</li>
* <li>位运算判断方法正确</li>
* </ul>
* </p>
*/
@Test
@DisplayName("测试2的幂次方判断")
void testPowerOfTwo() {
// 测试2的幂次方
int[] powersOfTwo = {1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024};
for (int n : powersOfTwo) {
assertTrue(isPowerOfTwo(n), n + " 应该是2的幂次方");
}
// 测试非2的幂次方
int[] notPowersOfTwo = {0, 3, 5, 6, 7, 9, 10, 15, 17, 100, 1000};
for (int n : notPowersOfTwo) {
assertFalse(isPowerOfTwo(n), n + " 不应该是2的幂次方");
}
log.info("2的幂次方判断测试通过");
}
/**
* 测试序列号的边界情况
* <p>
* 验证点:
* <ul>
* <li>序列号0的处理</li>
* <li>序列号Long.MAX_VALUE附近的处理</li>
* <li>序列号溢出后的处理</li>
* </ul>
* </p>
*/
@Test
@DisplayName("测试序列号边界情况")
void testSequenceBoundaryConditions() {
// 测试序列号0
long seq0 = ringBuffer.next();
assertEquals(0, seq0, "第一个序列号应为 0");
ringBuffer.publish(seq0);
// 测试序列号1
long seq1 = ringBuffer.next();
assertEquals(1, seq1, "第二个序列号应为 1");
ringBuffer.publish(seq1);
// 测试接近Long.MAX_VALUE的序列号
// 注意:实际应用中不会达到这个值,但测试边界情况很重要
long largeSequence = Long.MAX_VALUE - 10;
int index = (int) (largeSequence & (BUFFER_SIZE - 1));
assertTrue(index >= 0 && index < BUFFER_SIZE,
"大序列号的索引应在有效范围内");
log.info("序列号边界情况测试通过");
}
/**
* 测试事件对象的预分配
* <p>
* RingBuffer 在初始化时会预分配所有事件对象。
* </p>
*
* <p>验证点:</p>
* <ul>
* <li>所有位置的事件对象都已预分配</li>
* <li>事件对象不为 null</li>
* <li>事件对象可以正常访问</li>
* </ul>
*
* @see com.lmax.disruptor.EventFactory
*/
@Test
@DisplayName("测试事件对象预分配")
void testEventPreallocation() {
// 访问所有位置的事件对象
for (int i = 0; i < BUFFER_SIZE; i++) {
OrderEvent event = ringBuffer.get(i);
assertNotNull(event, "位置 " + i + " 的事件对象不应为 null");
}
log.info("事件对象预分配测试通过 - 所有位置的对象都已预分配");
}
/**
* 测试序列号的连续性
* <p>
* 验证点:
* <ul>
* <li>next() 返回的序列号连续递增</li>
* <li>没有序列号重复或跳跃</li>
* </ul>
* </p>
*/
@Test
@DisplayName("测试序列号连续性")
void testSequenceContinuity() {
long expectedSequence = 0;
int iterations = 100;
for (int i = 0; i < iterations; i++) {
long sequence = ringBuffer.next();
assertEquals(expectedSequence, sequence,
"序列号应为 " + expectedSequence + ",实际为 " + sequence);
ringBuffer.publish(sequence);
expectedSequence++;
}
assertEquals(iterations, expectedSequence,
"最终序列号应为 " + iterations);
log.info("序列号连续性测试通过 - 迭代次数: {}", iterations);
}
/**
* 辅助方法判断一个数是否为2的幂次方
* <p>
* 使用位运算n & (n-1) == 0
* </p>
*
* @param n 要判断的数字
* @return 如果是2的幂次方返回 true否则返回 false
*/
private boolean isPowerOfTwo(int n) {
return n > 0 && (n & (n - 1)) == 0;
}
}

View File

@@ -0,0 +1,330 @@
package com.example.disruptor;
import com.example.disruptor.event.OrderEvent;
import com.example.disruptor.event.OrderEventFactory;
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.ProducerType;
import org.junit.jupiter.api.Assertions;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
* 等待策略性能对比测试
* <p>
* 本测试类对比不同等待策略的性能特征,包括:
* <ul>
* <li>BlockingWaitStrategy - 使用锁和条件变量CPU使用低延迟高</li>
* <li>SleepingWaitStrategy - 自旋后睡眠平衡性能和CPU使用</li>
* <li>YieldingWaitStrategy - 自旋后让出CPU低延迟高CPU使用</li>
* <li>BusySpinWaitStrategy - 持续自旋最低延迟极高CPU使用</li>
* <li>PhasedBackoffWaitStrategy - 自适应等待策略</li>
* </ul>
* </p>
*
* <p>性能指标:</p>
* <ul>
* <li>吞吐量Throughput - 每秒处理的事件数量</li>
* <li>延迟Latency - 事件从发布到消费的时间</li>
* <li>CPU使用率 - 等待期间CPU的使用情况</li>
* </ul>
*
* @see com.lmax.disruptor.WaitStrategy
* @see com.lmax.disruptor.BlockingWaitStrategy
* @see com.lmax.disruptor.SleepingWaitStrategy
* @see com.lmax.disruptor.YieldingWaitStrategy
* @see com.lmax.disruptor.BusySpinWaitStrategy
* @see com.lmax.disruptor.PhasedBackoffWaitStrategy
*/
@Slf4j
@DisplayName("等待策略性能对比测试")
class WaitStrategyPerformanceTest {
/**
* 缓冲区大小
*/
private static final int BUFFER_SIZE = 1024;
/**
* 测试事件数量
*/
private static final int EVENT_COUNT = 100_000;
/**
* 线程工厂
*/
private ThreadFactory threadFactory;
/**
* 线程池
*/
private ExecutorService executorService;
/**
* 测试前初始化
*/
@BeforeEach
void setUp() {
AtomicInteger threadNumber = new AtomicInteger(1);
threadFactory = r -> {
Thread thread = new Thread(r);
thread.setName("perf-test-" + threadNumber.getAndIncrement());
thread.setDaemon(true);
return thread;
};
executorService = Executors.newCachedThreadPool(threadFactory);
log.info("========== 性能测试开始 ==========");
}
/**
* 测试后清理
*/
@AfterEach
void tearDown() {
if (executorService != null && !executorService.isShutdown()) {
executorService.shutdown();
}
log.info("========== 性能测试结束 ==========\n");
}
/**
* 测试 BlockingWaitStrategy
* <p>
* 特点:
* <ul>
* <li>使用锁和条件变量</li>
* <li>CPU使用率最低</li>
* <li>延迟较高</li>
* <li>适用于对延迟不敏感的场景</li>
* </ul>
* </p>
*
* @see com.lmax.disruptor.BlockingWaitStrategy
*/
@Test
@DisplayName("测试 BlockingWaitStrategy 性能")
void testBlockingWaitStrategyPerformance() throws InterruptedException {
runPerformanceTest("BlockingWaitStrategy", new BlockingWaitStrategy());
}
/**
* 测试 SleepingWaitStrategy
* <p>
* 特点:
* <ul>
* <li>先自旋100次</li>
* <li>然后调用 Thread.yield()</li>
* <li>最后使用 LockSupport.parkNanos()</li>
* <li>平衡性能和CPU使用</li>
* <li>适用于大多数场景</li>
* </ul>
* </p>
*
* @see com.lmax.disruptor.SleepingWaitStrategy
*/
@Test
@DisplayName("测试 SleepingWaitStrategy 性能")
void testSleepingWaitStrategyPerformance() throws InterruptedException {
runPerformanceTest("SleepingWaitStrategy", new SleepingWaitStrategy());
}
/**
* 测试 YieldingWaitStrategy
* <p>
* 特点:
* <ul>
* <li>先自旋100次</li>
* <li>然后调用 Thread.yield()</li>
* <li>低延迟</li>
* <li>CPU使用率高</li>
* <li>适用于低延迟场景</li>
* </ul>
* </p>
*
* @see com.lmax.disruptor.YieldingWaitStrategy
*/
@Test
@DisplayName("测试 YieldingWaitStrategy 性能")
void testYieldingWaitStrategyPerformance() throws InterruptedException {
runPerformanceTest("YieldingWaitStrategy", new YieldingWaitStrategy());
}
/**
* 测试 BusySpinWaitStrategy
* <p>
* 特点:
* <ul>
* <li>持续自旋等待</li>
* <li>最低延迟</li>
* <li>极高CPU使用率</li>
* <li>仅适用于超低延迟场景</li>
* </ul>
* </p>
*
* @see com.lmax.disruptor.BusySpinWaitStrategy
*/
@Test
@DisplayName("测试 BusySpinWaitStrategy 性能")
void testBusySpinWaitStrategyPerformance() throws InterruptedException {
runPerformanceTest("BusySpinWaitStrategy", new BusySpinWaitStrategy());
}
/**
* 测试 PhasedBackoffWaitStrategy
* <p>
* 特点:
* <ul>
* <li>自适应等待策略</li>
* <li>在自旋和睡眠之间自动切换</li>
* <li>适用于负载不稳定的场景</li>
* </ul>
* </p>
*
* @see com.lmax.disruptor.PhasedBackoffWaitStrategy
*/
@Test
@DisplayName("测试 PhasedBackoffWaitStrategy 性能")
void testPhasedBackoffWaitStrategyPerformance() throws InterruptedException {
WaitStrategy strategy = new PhasedBackoffWaitStrategy(
1, 1000, TimeUnit.NANOSECONDS,
new SleepingWaitStrategy()
);
runPerformanceTest("PhasedBackoffWaitStrategy", strategy);
}
/**
* 运行性能测试
* <p>
* 测试流程:
* <ol>
* <li>创建 Disruptor 实例</li>
* <li>注册消费者</li>
* <li>发布指定数量的事件</li>
* <li>等待所有事件被处理</li>
* <li>计算性能指标</li>
* </ol>
* </p>
*
* @param strategyName 策略名称
* @param strategy 等待策略
*/
private void runPerformanceTest(String strategyName, WaitStrategy strategy) throws InterruptedException {
log.info("开始测试 {} 性能", strategyName);
AtomicInteger processedCount = new AtomicInteger(0);
CountDownLatch latch = new CountDownLatch(EVENT_COUNT);
AtomicLong totalLatency = new AtomicLong(0);
// 创建 Disruptor
Disruptor<OrderEvent> disruptor = new Disruptor<>(
new OrderEventFactory(),
BUFFER_SIZE,
threadFactory,
ProducerType.SINGLE,
strategy
);
// 注册消费者
disruptor.handleEventsWith((event, sequence, endOfBatch) -> {
// 计算延迟
long latency = System.nanoTime() - event.getTimestamp();
totalLatency.addAndGet(latency);
processedCount.incrementAndGet();
latch.countDown();
});
// 启动 Disruptor
RingBuffer<OrderEvent> ringBuffer = disruptor.start();
// 开始计时
long startTime = System.nanoTime();
// 发布事件
for (int i = 0; i < EVENT_COUNT; i++) {
long sequence = ringBuffer.next();
try {
OrderEvent event = ringBuffer.get(sequence);
event.setOrderId(i);
event.setAmount(i * 100.0);
event.setOrderType("PERF_TEST");
event.setTimestamp(System.nanoTime());
} finally {
ringBuffer.publish(sequence);
}
}
// 等待所有事件被处理
latch.await();
// 结束计时
long endTime = System.nanoTime();
long durationNanos = endTime - startTime;
double durationSeconds = durationNanos / 1_000_000_000.0;
// 计算性能指标
long throughput = (long) (EVENT_COUNT / durationSeconds);
double avgLatencyMicros = (totalLatency.get() / (double) EVENT_COUNT) / 1000.0;
// 输出结果
log.info("========== {} 性能测试结果 ==========", strategyName);
log.info("事件数量: {}", EVENT_COUNT);
log.info("总耗时: {:.3f} 秒", durationSeconds);
log.info("吞吐量: {} 事件/秒", throughput);
log.info("平均延迟: {:.3f} 微秒", avgLatencyMicros);
log.info("==========================================");
// 验证所有事件都被处理
Assertions.assertEquals(EVENT_COUNT, processedCount.get(),
strategyName + " 应该处理所有事件");
disruptor.shutdown();
}
/**
* 对比所有等待策略的性能
* <p>
* 本测试一次性运行所有等待策略,便于对比性能差异。
* </p>
*/
@Test
@DisplayName("对比所有等待策略性能")
void testCompareAllWaitStrategies() throws InterruptedException {
log.info("========== 开始对比所有等待策略性能 ==========");
WaitStrategy[] strategies = {
new BlockingWaitStrategy(),
new SleepingWaitStrategy(),
new YieldingWaitStrategy(),
new BusySpinWaitStrategy(),
new PhasedBackoffWaitStrategy(1, 1000, TimeUnit.NANOSECONDS, new SleepingWaitStrategy())
};
String[] strategyNames = {
"BlockingWaitStrategy",
"SleepingWaitStrategy",
"YieldingWaitStrategy",
"BusySpinWaitStrategy",
"PhasedBackoffWaitStrategy"
};
for (int i = 0; i < strategies.length; i++) {
runPerformanceTest(strategyNames[i], strategies[i]);
log.info("");
}
log.info("========== 所有等待策略性能对比完成 ==========");
}
}