- 基于 JDK 21 + Spring Boot 3.5.0 - 使用 Disruptor 3.4.4 - 包含详细的中文注释和 @see 引用 - 提供 RingBuffer 核心功能演示 - 27 个单元测试,全部通过 - 添加 README 文档 - 配置 .gitignore 排除非项目文件
299 lines
7.5 KiB
Markdown
299 lines
7.5 KiB
Markdown
# Disruptor 学习项目
|
||
|
||
基于 JDK 21 + Spring Boot 3 的 LMAX Disruptor 高性能学习项目,包含详细的中文注释和单元测试,便于深入学习和理解 Disruptor 框架的核心原理。
|
||
|
||
## 项目简介
|
||
|
||
本项目是一个完整的 Disruptor 学习示例,演示了 Disruptor 框架的核心功能和最佳实践。通过实际代码和详细注释,帮助开发者理解:
|
||
|
||
- RingBuffer 环形缓冲区的工作原理
|
||
- 事件发布-订阅模式
|
||
- 序列号(Sequence)机制
|
||
- 等待策略(WaitStrategy)的选择
|
||
- 多消费者并行/串行处理
|
||
|
||
## 技术栈
|
||
|
||
- **JDK**: 21
|
||
- **Spring Boot**: 3.5.0
|
||
- **Disruptor**: 3.4.4
|
||
- **构建工具**: Maven
|
||
- **测试框架**: JUnit 5
|
||
|
||
## 项目结构
|
||
|
||
```
|
||
disruptor-learn/
|
||
├── src/main/java/com/example/disruptor/
|
||
│ ├── event/
|
||
│ │ ├── OrderEvent.java # 订单事件类
|
||
│ │ └── OrderEventFactory.java # 事件工厂
|
||
│ ├── handler/
|
||
│ │ └── OrderEventHandler.java # 事件处理器
|
||
│ ├── producer/
|
||
│ │ └── OrderEventProducer.java # 事件生产者
|
||
│ ├── config/
|
||
│ │ └── DisruptorConfig.java # Disruptor 配置
|
||
│ ├── controller/
|
||
│ │ └── OrderController.java # REST API 控制器
|
||
│ ├── ringbuffer/
|
||
│ │ └── RingBufferDemo.java # RingBuffer 核心演示
|
||
│ └── DisruptorLearnApplication.java
|
||
├── src/test/java/com/example/disruptor/
|
||
│ ├── DisruptorCoreFunctionalityTest.java # Disruptor 核心功能测试
|
||
│ ├── RingBufferCoreTest.java # RingBuffer 核心测试
|
||
│ └── WaitStrategyPerformanceTest.java # 等待策略性能测试
|
||
└── pom.xml
|
||
```
|
||
|
||
## 快速开始
|
||
|
||
### 环境要求
|
||
|
||
- JDK 21 或更高版本
|
||
- Maven 3.6 或更高版本
|
||
|
||
### 编译项目
|
||
|
||
```bash
|
||
mvn clean compile
|
||
```
|
||
|
||
### 运行测试
|
||
|
||
```bash
|
||
mvn test
|
||
```
|
||
|
||
### 启动应用
|
||
|
||
```bash
|
||
mvn spring-boot:run
|
||
```
|
||
|
||
应用启动后,访问 http://localhost:8080
|
||
|
||
## 核心功能
|
||
|
||
### 1. Disruptor 核心功能
|
||
|
||
#### 事件发布和消费
|
||
|
||
```java
|
||
// 发布事件
|
||
OrderEventProducer producer = new OrderEventProducer(ringBuffer);
|
||
producer.publishEvent(orderId, amount, orderType);
|
||
|
||
// 消费事件
|
||
disruptor.handleEventsWith(new OrderEventHandler("订单处理器"));
|
||
```
|
||
|
||
#### 多消费者处理
|
||
|
||
```java
|
||
// 并行处理
|
||
disruptor.handleEventsWith(handler1, handler2, handler3);
|
||
|
||
// 串行处理
|
||
disruptor.handleEventsWith(handler1).then(handler2).then(handler3);
|
||
```
|
||
|
||
#### 批量发布
|
||
|
||
```java
|
||
// 批量获取序列号
|
||
long startSequence = ringBuffer.next(batchSize);
|
||
|
||
// 填充事件
|
||
for (int i = 0; i < batchSize; i++) {
|
||
OrderEvent event = ringBuffer.get(startSequence + i);
|
||
// 填充数据
|
||
}
|
||
|
||
// 批量发布
|
||
ringBuffer.publish(startSequence, batchSize);
|
||
```
|
||
|
||
### 2. RingBuffer 核心功能
|
||
|
||
#### 环形数据结构
|
||
|
||
RingBuffer 使用固定大小的环形数组,通过位运算实现高效的索引计算:
|
||
|
||
```java
|
||
// 序列号到索引的转换(位运算)
|
||
int index = (int) (sequence & (bufferSize - 1));
|
||
```
|
||
|
||
#### 序列号机制
|
||
|
||
```java
|
||
// 获取序列号
|
||
long sequence = ringBuffer.next();
|
||
|
||
// 获取事件对象
|
||
OrderEvent event = ringBuffer.get(sequence);
|
||
|
||
// 发布事件
|
||
ringBuffer.publish(sequence);
|
||
```
|
||
|
||
## API 文档
|
||
|
||
### 创建单个订单
|
||
|
||
```bash
|
||
POST /api/orders
|
||
Content-Type: application/json
|
||
|
||
{
|
||
"amount": 100.50,
|
||
"orderType": "NORMAL"
|
||
}
|
||
```
|
||
|
||
### 批量创建订单
|
||
|
||
```bash
|
||
POST /api/orders/batch
|
||
Content-Type: application/json
|
||
|
||
{
|
||
"count": 10,
|
||
"amount": 50.0,
|
||
"orderType": "BATCH"
|
||
}
|
||
```
|
||
|
||
### 查看系统状态
|
||
|
||
```bash
|
||
GET /api/orders/status
|
||
```
|
||
|
||
## 等待策略
|
||
|
||
Disruptor 提供多种等待策略,不同的策略在性能和 CPU 使用率之间有不同的权衡:
|
||
|
||
| 策略 | CPU 使用 | 延迟 | 吞吐量 | 适用场景 |
|
||
|------|----------|------|--------|----------|
|
||
| BlockingWaitStrategy | 低 | 高 | 高 | 通用场景 |
|
||
| SleepingWaitStrategy | 低 | 中 | 高 | 平衡场景 |
|
||
| YieldingWaitStrategy | 高 | 低 | 最高 | 低延迟场景 |
|
||
| BusySpinWaitStrategy | 极高 | 最低 | 最高 | 超低延迟 |
|
||
| PhasedBackoffWaitStrategy | 自适应 | 自适应 | 高 | 自适应场景 |
|
||
|
||
### 选择建议
|
||
|
||
- **通用场景**: 使用 `SleepingWaitStrategy`,在性能和 CPU 使用之间取得平衡
|
||
- **低延迟场景**: 使用 `YieldingWaitStrategy`,牺牲 CPU 使用率换取低延迟
|
||
- **超低延迟场景**: 使用 `BusySpinWaitStrategy`,适用于对延迟极其敏感的场景
|
||
- **资源受限场景**: 使用 `BlockingWaitStrategy`,最小化 CPU 使用
|
||
|
||
## 测试说明
|
||
|
||
项目包含 27 个单元测试,覆盖以下方面:
|
||
|
||
### DisruptorCoreFunctionalityTest
|
||
|
||
- Disruptor 初始化测试
|
||
- 事件发布和消费测试
|
||
- 序列号机制测试
|
||
- 环形特性测试
|
||
- 多消费者并行处理测试
|
||
- 消费者串行处理测试
|
||
- 批量发布测试
|
||
- 不同等待策略测试
|
||
- 事件对象复用测试
|
||
|
||
### RingBufferCoreTest
|
||
|
||
- RingBuffer 基本属性测试
|
||
- 序列号到索引转换测试
|
||
- 游标管理测试
|
||
- 剩余容量计算测试
|
||
- 环形循环特性测试
|
||
- 事件对象获取和设置测试
|
||
- 批量序列号分配测试
|
||
- 2的幂次方判断测试
|
||
- 序列号边界情况测试
|
||
- 事件对象预分配测试
|
||
- 序列号连续性测试
|
||
|
||
### WaitStrategyPerformanceTest
|
||
|
||
- BlockingWaitStrategy 性能测试
|
||
- SleepingWaitStrategy 性能测试
|
||
- YieldingWaitStrategy 性能测试
|
||
- BusySpinWaitStrategy 性能测试
|
||
- PhasedBackoffWaitStrategy 性能测试
|
||
- 所有等待策略性能对比
|
||
|
||
运行测试:
|
||
|
||
```bash
|
||
# 运行所有测试
|
||
mvn test
|
||
|
||
# 运行特定测试类
|
||
mvn test -Dtest=DisruptorCoreFunctionalityTest
|
||
|
||
# 运行特定测试方法
|
||
mvn test -Dtest=RingBufferCoreTest#testSequenceMechanism
|
||
```
|
||
|
||
## 核心概念
|
||
|
||
### RingBuffer
|
||
|
||
RingBuffer 是 Disruptor 的核心数据结构,是一个固定大小的环形数组。它的特点:
|
||
|
||
- **预分配内存**: 在初始化时预分配所有事件对象,避免运行时 GC
|
||
- **环形结构**: 使用位运算实现高效的索引计算
|
||
- **无锁设计**: 通过序列号机制实现线程安全,无需使用锁
|
||
|
||
### Sequence
|
||
|
||
Sequence 是一个原子递增的序列号,用于:
|
||
|
||
- 追踪事件在缓冲区中的位置
|
||
- 协调生产者和消费者
|
||
- 实现无锁并发
|
||
|
||
### WaitStrategy
|
||
|
||
WaitStrategy 决定消费者如何等待新事件,是 Disruptor 性能调优的关键参数。
|
||
|
||
### EventFactory
|
||
|
||
EventFactory 用于在 Disruptor 初始化时预分配事件对象。
|
||
|
||
### EventHandler
|
||
|
||
EventHandler 是消费者处理事件的接口,每个 EventHandler 在独立的线程中运行。
|
||
|
||
## 学习资源
|
||
|
||
- [Disruptor 官方文档](https://github.com/lmax-exchange/disruptor/wiki)
|
||
- [Disruptor 源码](https://github.com/lmax-exchange/disruptor)
|
||
- [Disruptor 性能测试](https://github.com/lmax-exchange/disruptor/wiki/Performance-Results)
|
||
- [LMAX Disruptor 论文](https://lmax-exchange.github.io/disruptor/disruptor.html)
|
||
|
||
## 最佳实践
|
||
|
||
1. **缓冲区大小**: 必须是 2 的幂次方(如 1024, 2048, 4096)
|
||
2. **事件对象**: 提供清晰的 `clear()` 方法,在事件被复用前清理数据
|
||
3. **等待策略**: 根据实际场景选择合适的等待策略
|
||
4. **批量处理**: 使用 `endOfBatch` 标志进行批量操作优化
|
||
5. **避免阻塞**: 在 EventHandler 中避免执行阻塞操作
|
||
|
||
## 注意事项
|
||
|
||
- 所有代码都包含详细的中文注释和 `@see` 引用
|
||
- 测试覆盖了核心功能和边界场景
|
||
- 项目使用 JDK 21 和 Spring Boot 3.5.0
|
||
- 确保本地已安装 Maven(项目不包含 Maven Wrapper)
|
||
|
||
## 许可证
|
||
|
||
本项目仅用于学习目的,遵循 Disruptor 的 Apache 2.0 许可证。 |