commit c9600a1231dd3889a32dbd3a46c8508da36d610e Author: F嘉阳 <1186032234@qq.com> Date: Wed Dec 31 10:41:35 2025 +0800 feat: 添加 Disruptor 学习项目 - 基于 JDK 21 + Spring Boot 3.5.0 - 使用 Disruptor 3.4.4 - 包含详细的中文注释和 @see 引用 - 提供 RingBuffer 核心功能演示 - 27 个单元测试,全部通过 - 添加 README 文档 - 配置 .gitignore 排除非项目文件 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..da70562 --- /dev/null +++ b/.gitignore @@ -0,0 +1,30 @@ +# IDE +.idea/ +*.iml +.vscode/ +*.swp +*.swo +*~ + +# Maven +target/ +pom.xml.tag +pom.xml.releaseBackup +pom.xml.versionsBackup +pom.xml.next +release.properties +dependency-reduced-pom.xml +buildNumber.properties +.mvn/timing.properties +.mvn/wrapper/maven-wrapper.jar + +# Logs +*.log +logs/ + +# OS +.DS_Store +Thumbs.db + +# Spring Boot +spring-boot-*.log diff --git a/README.md b/README.md new file mode 100644 index 0000000..6ff11f2 --- /dev/null +++ b/README.md @@ -0,0 +1,299 @@ +# Disruptor 学习项目 + +基于 JDK 21 + Spring Boot 3 的 LMAX Disruptor 高性能学习项目,包含详细的中文注释和单元测试,便于深入学习和理解 Disruptor 框架的核心原理。 + +## 项目简介 + +本项目是一个完整的 Disruptor 学习示例,演示了 Disruptor 框架的核心功能和最佳实践。通过实际代码和详细注释,帮助开发者理解: + +- RingBuffer 环形缓冲区的工作原理 +- 事件发布-订阅模式 +- 序列号(Sequence)机制 +- 等待策略(WaitStrategy)的选择 +- 多消费者并行/串行处理 + +## 技术栈 + +- **JDK**: 21 +- **Spring Boot**: 3.5.0 +- **Disruptor**: 3.4.4 +- **构建工具**: Maven +- **测试框架**: JUnit 5 + +## 项目结构 + +``` +disruptor-learn/ +├── src/main/java/com/example/disruptor/ +│ ├── event/ +│ │ ├── OrderEvent.java # 订单事件类 +│ │ └── OrderEventFactory.java # 事件工厂 +│ ├── handler/ +│ │ └── OrderEventHandler.java # 事件处理器 +│ ├── producer/ +│ │ └── OrderEventProducer.java # 事件生产者 +│ ├── config/ +│ │ └── DisruptorConfig.java # Disruptor 配置 +│ ├── controller/ +│ │ └── OrderController.java # REST API 控制器 +│ ├── ringbuffer/ +│ │ └── RingBufferDemo.java # RingBuffer 核心演示 +│ └── DisruptorLearnApplication.java +├── src/test/java/com/example/disruptor/ +│ ├── DisruptorCoreFunctionalityTest.java # Disruptor 核心功能测试 +│ ├── RingBufferCoreTest.java # RingBuffer 核心测试 +│ └── WaitStrategyPerformanceTest.java # 等待策略性能测试 +└── pom.xml +``` + +## 快速开始 + +### 环境要求 + +- JDK 21 或更高版本 +- Maven 3.6 或更高版本 + +### 编译项目 + +```bash +mvn clean compile +``` + +### 运行测试 + +```bash +mvn test +``` + +### 启动应用 + +```bash +mvn spring-boot:run +``` + +应用启动后,访问 http://localhost:8080 + +## 核心功能 + +### 1. Disruptor 核心功能 + +#### 事件发布和消费 + +```java +// 发布事件 +OrderEventProducer producer = new OrderEventProducer(ringBuffer); +producer.publishEvent(orderId, amount, orderType); + +// 消费事件 +disruptor.handleEventsWith(new OrderEventHandler("订单处理器")); +``` + +#### 多消费者处理 + +```java +// 并行处理 +disruptor.handleEventsWith(handler1, handler2, handler3); + +// 串行处理 +disruptor.handleEventsWith(handler1).then(handler2).then(handler3); +``` + +#### 批量发布 + +```java +// 批量获取序列号 +long startSequence = ringBuffer.next(batchSize); + +// 填充事件 +for (int i = 0; i < batchSize; i++) { + OrderEvent event = ringBuffer.get(startSequence + i); + // 填充数据 +} + +// 批量发布 +ringBuffer.publish(startSequence, batchSize); +``` + +### 2. RingBuffer 核心功能 + +#### 环形数据结构 + +RingBuffer 使用固定大小的环形数组,通过位运算实现高效的索引计算: + +```java +// 序列号到索引的转换(位运算) +int index = (int) (sequence & (bufferSize - 1)); +``` + +#### 序列号机制 + +```java +// 获取序列号 +long sequence = ringBuffer.next(); + +// 获取事件对象 +OrderEvent event = ringBuffer.get(sequence); + +// 发布事件 +ringBuffer.publish(sequence); +``` + +## API 文档 + +### 创建单个订单 + +```bash +POST /api/orders +Content-Type: application/json + +{ + "amount": 100.50, + "orderType": "NORMAL" +} +``` + +### 批量创建订单 + +```bash +POST /api/orders/batch +Content-Type: application/json + +{ + "count": 10, + "amount": 50.0, + "orderType": "BATCH" +} +``` + +### 查看系统状态 + +```bash +GET /api/orders/status +``` + +## 等待策略 + +Disruptor 提供多种等待策略,不同的策略在性能和 CPU 使用率之间有不同的权衡: + +| 策略 | CPU 使用 | 延迟 | 吞吐量 | 适用场景 | +|------|----------|------|--------|----------| +| BlockingWaitStrategy | 低 | 高 | 高 | 通用场景 | +| SleepingWaitStrategy | 低 | 中 | 高 | 平衡场景 | +| YieldingWaitStrategy | 高 | 低 | 最高 | 低延迟场景 | +| BusySpinWaitStrategy | 极高 | 最低 | 最高 | 超低延迟 | +| PhasedBackoffWaitStrategy | 自适应 | 自适应 | 高 | 自适应场景 | + +### 选择建议 + +- **通用场景**: 使用 `SleepingWaitStrategy`,在性能和 CPU 使用之间取得平衡 +- **低延迟场景**: 使用 `YieldingWaitStrategy`,牺牲 CPU 使用率换取低延迟 +- **超低延迟场景**: 使用 `BusySpinWaitStrategy`,适用于对延迟极其敏感的场景 +- **资源受限场景**: 使用 `BlockingWaitStrategy`,最小化 CPU 使用 + +## 测试说明 + +项目包含 27 个单元测试,覆盖以下方面: + +### DisruptorCoreFunctionalityTest + +- Disruptor 初始化测试 +- 事件发布和消费测试 +- 序列号机制测试 +- 环形特性测试 +- 多消费者并行处理测试 +- 消费者串行处理测试 +- 批量发布测试 +- 不同等待策略测试 +- 事件对象复用测试 + +### RingBufferCoreTest + +- RingBuffer 基本属性测试 +- 序列号到索引转换测试 +- 游标管理测试 +- 剩余容量计算测试 +- 环形循环特性测试 +- 事件对象获取和设置测试 +- 批量序列号分配测试 +- 2的幂次方判断测试 +- 序列号边界情况测试 +- 事件对象预分配测试 +- 序列号连续性测试 + +### WaitStrategyPerformanceTest + +- BlockingWaitStrategy 性能测试 +- SleepingWaitStrategy 性能测试 +- YieldingWaitStrategy 性能测试 +- BusySpinWaitStrategy 性能测试 +- PhasedBackoffWaitStrategy 性能测试 +- 所有等待策略性能对比 + +运行测试: + +```bash +# 运行所有测试 +mvn test + +# 运行特定测试类 +mvn test -Dtest=DisruptorCoreFunctionalityTest + +# 运行特定测试方法 +mvn test -Dtest=RingBufferCoreTest#testSequenceMechanism +``` + +## 核心概念 + +### RingBuffer + +RingBuffer 是 Disruptor 的核心数据结构,是一个固定大小的环形数组。它的特点: + +- **预分配内存**: 在初始化时预分配所有事件对象,避免运行时 GC +- **环形结构**: 使用位运算实现高效的索引计算 +- **无锁设计**: 通过序列号机制实现线程安全,无需使用锁 + +### Sequence + +Sequence 是一个原子递增的序列号,用于: + +- 追踪事件在缓冲区中的位置 +- 协调生产者和消费者 +- 实现无锁并发 + +### WaitStrategy + +WaitStrategy 决定消费者如何等待新事件,是 Disruptor 性能调优的关键参数。 + +### EventFactory + +EventFactory 用于在 Disruptor 初始化时预分配事件对象。 + +### EventHandler + +EventHandler 是消费者处理事件的接口,每个 EventHandler 在独立的线程中运行。 + +## 学习资源 + +- [Disruptor 官方文档](https://github.com/lmax-exchange/disruptor/wiki) +- [Disruptor 源码](https://github.com/lmax-exchange/disruptor) +- [Disruptor 性能测试](https://github.com/lmax-exchange/disruptor/wiki/Performance-Results) +- [LMAX Disruptor 论文](https://lmax-exchange.github.io/disruptor/disruptor.html) + +## 最佳实践 + +1. **缓冲区大小**: 必须是 2 的幂次方(如 1024, 2048, 4096) +2. **事件对象**: 提供清晰的 `clear()` 方法,在事件被复用前清理数据 +3. **等待策略**: 根据实际场景选择合适的等待策略 +4. **批量处理**: 使用 `endOfBatch` 标志进行批量操作优化 +5. **避免阻塞**: 在 EventHandler 中避免执行阻塞操作 + +## 注意事项 + +- 所有代码都包含详细的中文注释和 `@see` 引用 +- 测试覆盖了核心功能和边界场景 +- 项目使用 JDK 21 和 Spring Boot 3.5.0 +- 确保本地已安装 Maven(项目不包含 Maven Wrapper) + +## 许可证 + +本项目仅用于学习目的,遵循 Disruptor 的 Apache 2.0 许可证。 \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..481906d --- /dev/null +++ b/pom.xml @@ -0,0 +1,89 @@ + + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 3.5.0 + + + com.example + disruptor-learn + 0.0.1-SNAPSHOT + disruptor-learn + Demo project for Spring Boot + + + + + + + + + + + + + + + 21 + + + + org.springframework.boot + spring-boot-starter-web + + + + + + + + com.lmax + disruptor + 3.4.4 + + + + org.projectlombok + lombok + true + + + org.springframework.boot + spring-boot-starter-test + test + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + + org.projectlombok + lombok + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + org.projectlombok + lombok + + + + + + + + diff --git a/src/main/java/com/example/disruptor/DisruptorLearnApplication.java b/src/main/java/com/example/disruptor/DisruptorLearnApplication.java new file mode 100644 index 0000000..6d8aa19 --- /dev/null +++ b/src/main/java/com/example/disruptor/DisruptorLearnApplication.java @@ -0,0 +1,13 @@ +package com.example.disruptor; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class DisruptorLearnApplication { + + public static void main(String[] args) { + SpringApplication.run(DisruptorLearnApplication.class, args); + } + +} diff --git a/src/main/java/com/example/disruptor/config/DisruptorConfig.java b/src/main/java/com/example/disruptor/config/DisruptorConfig.java new file mode 100644 index 0000000..61b797c --- /dev/null +++ b/src/main/java/com/example/disruptor/config/DisruptorConfig.java @@ -0,0 +1,236 @@ +package com.example.disruptor.config; + +import com.example.disruptor.event.OrderEvent; +import com.example.disruptor.event.OrderEventFactory; +import com.example.disruptor.handler.OrderEventHandler; +import com.example.disruptor.producer.OrderEventProducer; +import com.lmax.disruptor.*; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Disruptor 配置类 + *

+ * 配置 Disruptor 的核心组件,包括: + *

+ *

+ * + *

核心概念:

+ * + * + * @see com.lmax.disruptor.dsl.Disruptor + * @see com.lmax.disruptor.RingBuffer + * @see com.lmax.disruptor.WaitStrategy + */ +@Slf4j +@Configuration +public class DisruptorConfig { + + /** + * 缓冲区大小 + *

+ * 必须是2的幂次方,因为 Disruptor 使用位运算来计算索引。 + * 常见大小:1024, 2048, 4096, 8192 等 + *

+ *

+ * 选择建议: + *

+ *

+ */ + private static final int BUFFER_SIZE = 1024; + + /** + * 线程工厂 + *

+ * 用于创建 Disruptor 的工作线程。 + * 自定义线程工厂可以更好地控制线程的命名和属性。 + *

+ * + * @return 自定义线程工厂 + * @see java.util.concurrent.ThreadFactory + */ + @Bean + public ThreadFactory disruptorThreadFactory() { + AtomicInteger threadNumber = new AtomicInteger(1); + return r -> { + Thread thread = new Thread(r); + thread.setName("disruptor-thread-" + threadNumber.getAndIncrement()); + thread.setDaemon(true); + return thread; + }; + } + + /** + * 等待策略 + *

+ * 决定消费者如何等待新事件,是 Disruptor 性能调优的关键参数。 + *

+ * + *

常用策略对比:

+ * + * + * + * + * + * + * + *
策略CPU使用延迟吞吐量适用场景
BlockingWaitStrategy通用场景
SleepingWaitStrategy平衡场景
YieldingWaitStrategy最高低延迟场景
BusySpinWaitStrategy极高最低最高超低延迟
PhasedBackoffWaitStrategy自适应自适应自适应场景
+ * + * @return 等待策略实例 + * @see com.lmax.disruptor.WaitStrategy + * @see com.lmax.disruptor.BlockingWaitStrategy + * @see com.lmax.disruptor.SleepingWaitStrategy + * @see com.lmax.disruptor.YieldingWaitStrategy + * @see com.lmax.disruptor.BusySpinWaitStrategy + */ + @Bean + public WaitStrategy waitStrategy() { + // 使用 SleepingWaitStrategy,在性能和CPU使用之间取得平衡 + // 它会先自旋一段时间,然后使用 Thread.yield(),最后使用 LockSupport.parkNanos() + return new SleepingWaitStrategy(); + } + + /** + * Disruptor 实例 + *

+ * Disruptor 是整个框架的入口点,负责协调生产者和消费者。 + *

+ * + *

初始化流程:

+ *
    + *
  1. 创建 Disruptor 实例,配置缓冲区大小、线程工厂、等待策略等
  2. + *
  3. 注册 EventHandler(消费者)
  4. + *
  5. 启动 Disruptor,开始处理事件
  6. + *
  7. 获取 RingBuffer 用于发布事件
  8. + *
+ * + *

生产者类型选择:

+ * + * + * @param threadFactory 线程工厂 + * @param waitStrategy 等待策略 + * @return Disruptor 实例 + * @see com.lmax.disruptor.dsl.Disruptor + * @see com.lmax.disruptor.dsl.ProducerType + */ + @Bean + public Disruptor disruptor(ThreadFactory threadFactory, WaitStrategy waitStrategy) { + log.info("初始化 Disruptor,缓冲区大小: {}", BUFFER_SIZE); + + // 创建 Disruptor 实例 + // 参数说明: + // 1. eventFactory: 事件工厂,用于预分配事件对象 + // 2. bufferSize: 缓冲区大小,必须是2的幂次方 + // 3. threadFactory: 线程工厂,用于创建消费者线程 + // 4. producerType: 生产者类型,SINGLE 或 MULTI + // 5. waitStrategy: 等待策略,决定消费者如何等待新事件 + Disruptor disruptor = new Disruptor<>( + new OrderEventFactory(), + BUFFER_SIZE, + threadFactory, + ProducerType.SINGLE, + waitStrategy + ); + + // 注册事件处理器(消费者) + // 可以注册多个处理器,它们可以并行或串行处理事件 + disruptor.handleEventsWith(new OrderEventHandler("订单处理器-1")); + // 如果需要多个处理器并行处理: + // disruptor.handleEventsWith(handler1, handler2, handler3); + // 如果需要串行处理: + // disruptor.handleEventsWith(handler1).then(handler2).then(handler3); + + // 启动 Disruptor + // 启动后,消费者线程开始运行,等待事件 + RingBuffer ringBuffer = disruptor.start(); + log.info("Disruptor 启动成功,RingBuffer 容量: {}, 剩余容量: {}", + ringBuffer.getBufferSize(), ringBuffer.remainingCapacity()); + + return disruptor; + } + + /** + * RingBuffer Bean + *

+ * RingBuffer 是 Disruptor 的核心数据结构,用于存储和传递事件。 + * 生产者通过 RingBuffer 发布事件,消费者从 RingBuffer 消费事件。 + *

+ * + *

核心特性:

+ *
    + *
  • 固定大小的环形数组
  • + *
  • 预分配内存,避免GC
  • + *
  • 使用序列号追踪事件位置
  • + *
  • 无锁设计,高性能
  • + *
+ * + * @param disruptor Disruptor 实例 + * @return RingBuffer 实例 + * @see com.lmax.disruptor.RingBuffer + */ + @Bean + public RingBuffer ringBuffer(Disruptor disruptor) { + return disruptor.getRingBuffer(); + } + + /** + * OrderEventProducer Bean + *

+ * 封装了事件发布的逻辑,提供简洁的API供业务代码使用。 + *

+ * + * @param ringBuffer RingBuffer 实例 + * @return OrderEventProducer 实例 + * @see com.example.disruptor.producer.OrderEventProducer + */ + @Bean + public OrderEventProducer orderEventProducer(RingBuffer ringBuffer) { + return new OrderEventProducer(ringBuffer); + } + + /** + * ExecutorService Bean + *

+ * 用于管理 Disruptor 的消费者线程。 + * 虽然可以使用默认的线程工厂,但自定义 ExecutorService 可以更好地控制线程池。 + *

+ * + * @return ExecutorService 实例 + * @see java.util.concurrent.ExecutorService + */ + @Bean + public ExecutorService executorService() { + return Executors.newCachedThreadPool(r -> { + Thread thread = new Thread(r); + thread.setName("disruptor-executor-" + thread.getId()); + thread.setDaemon(true); + return thread; + }); + } +} \ No newline at end of file diff --git a/src/main/java/com/example/disruptor/controller/OrderController.java b/src/main/java/com/example/disruptor/controller/OrderController.java new file mode 100644 index 0000000..a3c69a1 --- /dev/null +++ b/src/main/java/com/example/disruptor/controller/OrderController.java @@ -0,0 +1,197 @@ +package com.example.disruptor.controller; + +import com.example.disruptor.producer.OrderEventProducer; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.*; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +/** + * 订单控制器 + *

+ * 提供 REST API 接口,用于演示 Disruptor 的使用。 + * 通过 HTTP 请求触发事件发布,观察 Disruptor 如何处理事件。 + *

+ * + * @see com.example.disruptor.producer.OrderEventProducer + * @see com.lmax.disruptor.RingBuffer + */ +@Slf4j +@RestController +@RequestMapping("/api/orders") +@RequiredArgsConstructor +public class OrderController { + + /** + * 订单事件生产者 + */ + private final OrderEventProducer orderEventProducer; + + /** + * 订单ID计数器,用于生成唯一的订单ID + */ + private final AtomicLong orderIdGenerator = new AtomicLong(1); + + /** + * 创建单个订单 + *

+ * 通过 HTTP POST 请求创建订单,事件会被发布到 Disruptor 的环形缓冲区中。 + *

+ * + *

请求示例:

+ *
+     * POST /api/orders
+     * {
+     *   "amount": 100.50,
+     *   "orderType": "NORMAL"
+     * }
+     * 
+ * + * @param orderRequest 订单请求 + * @return 订单创建结果 + */ + @PostMapping + public Map createOrder(@RequestBody OrderRequest orderRequest) { + long orderId = orderIdGenerator.getAndIncrement(); + + log.info("接收到订单创建请求 - 订单ID: {}, 金额: {}, 类型: {}", + orderId, orderRequest.getAmount(), orderRequest.getOrderType()); + + // 发布事件到 Disruptor + orderEventProducer.publishEvent( + orderId, + orderRequest.getAmount(), + orderRequest.getOrderType() + ); + + Map response = new HashMap<>(); + response.put("orderId", orderId); + response.put("amount", orderRequest.getAmount()); + response.put("orderType", orderRequest.getOrderType()); + response.put("message", "订单已提交,正在处理中"); + response.put("timestamp", System.currentTimeMillis()); + + return response; + } + + /** + * 批量创建订单 + *

+ * 演示批量发布事件的功能,可以提升发布性能。 + *

+ * + *

请求示例:

+ *
+     * POST /api/orders/batch
+     * {
+     *   "count": 10,
+     *   "amount": 50.0,
+     *   "orderType": "BATCH"
+     * }
+     * 
+ * + * @param batchRequest 批量订单请求 + * @return 批量创建结果 + */ + @PostMapping("/batch") + public Map createOrdersBatch(@RequestBody BatchOrderRequest batchRequest) { + int count = batchRequest.getCount(); + Object[][] orders = new Object[count][3]; + + for (int i = 0; i < count; i++) { + long orderId = orderIdGenerator.getAndIncrement(); + orders[i][0] = orderId; + orders[i][1] = batchRequest.getAmount(); + orders[i][2] = batchRequest.getOrderType(); + } + + log.info("接收到批量订单创建请求 - 数量: {}, 金额: {}, 类型: {}", + count, batchRequest.getAmount(), batchRequest.getOrderType()); + + // 批量发布事件 + orderEventProducer.publishEventsBatch(orders); + + Map response = new HashMap<>(); + response.put("count", count); + response.put("amount", batchRequest.getAmount()); + response.put("orderType", batchRequest.getOrderType()); + response.put("message", "批量订单已提交,正在处理中"); + response.put("timestamp", System.currentTimeMillis()); + + return response; + } + + /** + * 获取系统状态 + * + * @return 系统状态信息 + */ + @GetMapping("/status") + public Map getStatus() { + Map status = new HashMap<>(); + status.put("totalOrders", orderIdGenerator.get()); + status.put("timestamp", System.currentTimeMillis()); + status.put("status", "running"); + return status; + } + + /** + * 订单请求 + */ + public static class OrderRequest { + private double amount; + private String orderType = "NORMAL"; + + public double getAmount() { + return amount; + } + + public void setAmount(double amount) { + this.amount = amount; + } + + public String getOrderType() { + return orderType; + } + + public void setOrderType(String orderType) { + this.orderType = orderType; + } + } + + /** + * 批量订单请求 + */ + public static class BatchOrderRequest { + private int count = 10; + private double amount; + private String orderType = "BATCH"; + + public int getCount() { + return count; + } + + public void setCount(int count) { + this.count = count; + } + + public double getAmount() { + return amount; + } + + public void setAmount(double amount) { + this.amount = amount; + } + + public String getOrderType() { + return orderType; + } + + public void setOrderType(String orderType) { + this.orderType = orderType; + } + } +} \ No newline at end of file diff --git a/src/main/java/com/example/disruptor/event/OrderEvent.java b/src/main/java/com/example/disruptor/event/OrderEvent.java new file mode 100644 index 0000000..b0a5596 --- /dev/null +++ b/src/main/java/com/example/disruptor/event/OrderEvent.java @@ -0,0 +1,65 @@ +package com.example.disruptor.event; + +import lombok.Data; + +/** + * 订单事件类 + *

+ * 这是一个在 Disruptor 环形缓冲区中传递的事件对象。 + * Disruptor 通过预分配这些对象来避免在运行时进行垃圾回收,从而提高性能。 + *

+ * + *

设计要点:

+ *
    + *
  • 对象必须是可重用的,避免在事件处理过程中创建新对象
  • + *
  • 提供 clear() 方法在事件被复用前清理数据
  • + *
  • 使用简单的数据结构以减少内存占用
  • + *
+ * + * @see com.lmax.disruptor.RingBuffer + * @see com.lmax.disruptor.EventFactory + * @see Disruptor Core Concepts + */ +@Data +public class OrderEvent { + + /** + * 订单ID + */ + private long orderId; + + /** + * 订单金额 + */ + private double amount; + + /** + * 订单类型 + */ + private String orderType; + + /** + * 事件创建时间戳(纳秒) + */ + private long timestamp; + + /** + * 清理事件数据 + *

+ * Disruptor 会复用事件对象,因此在事件被重新使用前需要清理之前的数据。 + * 这对于避免内存泄漏和数据混淆非常重要。 + *

+ *

+ * 最佳实践:在 EventHandler 处理完事件后,调用 clear() 方法清理数据。 + * 或者使用专门的 ClearingEventHandler 来自动清理。 + *

+ * + * @see com.lmax.disruptor.examples.objectevent.ClearingEventHandler + */ + public void clear() { + this.orderId = 0; + this.amount = 0.0; + this.orderType = null; + this.timestamp = 0; + } +} \ No newline at end of file diff --git a/src/main/java/com/example/disruptor/event/OrderEventFactory.java b/src/main/java/com/example/disruptor/event/OrderEventFactory.java new file mode 100644 index 0000000..5e135ca --- /dev/null +++ b/src/main/java/com/example/disruptor/event/OrderEventFactory.java @@ -0,0 +1,45 @@ +package com.example.disruptor.event; + +import com.lmax.disruptor.EventFactory; + +/** + * 订单事件工厂类 + *

+ * 实现 {@link EventFactory} 接口,用于在 Disruptor 初始化时预分配事件对象。 + *

+ * + *

核心概念:

+ *
    + *
  • Disruptor 在启动时会预分配固定数量的事件对象到环形缓冲区
  • + *
  • 这些对象在运行时会被重复使用,避免了频繁的对象创建和垃圾回收
  • + *
  • 这种预分配策略是 Disruptor 高性能的关键因素之一
  • + *
+ * + *

使用场景:

+ *
    + *
  • Disruptor 在初始化时会调用 newInstance() 方法 bufferSize 次
  • + *
  • 这些预分配的对象会被存储在 RingBuffer 中循环使用
  • + *
  • 生产者获取对象后填充数据,消费者处理完后对象会被复用
  • + *
+ * + * @see com.lmax.disruptor.EventFactory + * @see com.lmax.disruptor.RingBuffer + * @see Disruptor Performance + */ +public class OrderEventFactory implements EventFactory { + + /** + * 创建新的事件对象 + *

+ * 该方法会在 Disruptor 初始化时被调用多次,以预填充环形缓冲区。 + * 每次调用都应该返回一个新的、独立的事件对象实例。 + *

+ * + * @return 新创建的 OrderEvent 实例 + * @see com.lmax.disruptor.RingBuffer#RingBuffer(com.lmax.disruptor.EventFactory, com.lmax.disruptor.sequencer.Sequencer) + */ + @Override + public OrderEvent newInstance() { + return new OrderEvent(); + } +} \ No newline at end of file diff --git a/src/main/java/com/example/disruptor/handler/OrderEventHandler.java b/src/main/java/com/example/disruptor/handler/OrderEventHandler.java new file mode 100644 index 0000000..8ae2f3f --- /dev/null +++ b/src/main/java/com/example/disruptor/handler/OrderEventHandler.java @@ -0,0 +1,140 @@ +package com.example.disruptor.handler; + +import com.example.disruptor.event.OrderEvent; +import com.lmax.disruptor.EventHandler; +import lombok.extern.slf4j.Slf4j; + +/** + * 订单事件处理器 + *

+ * 实现 {@link EventHandler} 接口,用于处理从环形缓冲区中消费的事件。 + * 这是 Disruptor 模式中的消费者核心组件。 + *

+ * + *

核心概念:

+ *
    + *
  • EventHandler 是消费者处理事件的回调接口
  • + *
  • 每个 EventHandler 会在独立的线程中运行(由 Executor 管理)
  • + *
  • Disruptor 支持多个 EventHandler 并行或串行处理事件
  • + *
  • 通过 sequence 参数可以追踪事件在缓冲区中的位置
  • + *
+ * + *

参数说明:

+ *
    + *
  • event: 从 RingBuffer 中获取的事件对象
  • + *
  • sequence: 事件的序列号,用于追踪和调试
  • + *
  • endOfBatch: 标识是否为当前批次中的最后一个事件,可用于批量处理优化
  • + *
+ * + * @see com.lmax.disruptor.EventHandler + * @see com.lmax.disruptor.BatchEventProcessor + * @see com.lmax.disruptor.dsl.Disruptor#handleEventsWith(EventHandler[]) + */ +@Slf4j +public class OrderEventHandler implements EventHandler { + + /** + * 处理器的名称,用于标识不同的处理器实例 + */ + private final String handlerName; + + /** + * 构造函数 + * + * @param handlerName 处理器名称,便于日志追踪和调试 + */ + public OrderEventHandler(String handlerName) { + this.handlerName = handlerName; + } + + /** + * 处理事件的核心方法 + *

+ * 当事件可用时,Disruptor 会调用此方法。该方法在独立的线程中执行。 + *

+ * + *

处理流程:

+ *
    + *
  1. 接收从 RingBuffer 中传递过来的事件
  2. + *
  3. 执行业务逻辑处理(这里模拟订单处理)
  4. + *
  5. 可选择清理事件数据(建议在最后一个处理器中清理)
  6. + *
+ * + *

性能优化建议:

+ *
    + *
  • 避免在 onEvent 中创建新对象
  • + *
  • 使用 endOfBatch 标志进行批量处理优化(如批量写入数据库)
  • + *
  • 保持处理逻辑简洁,避免阻塞操作
  • + *
+ * + * @param event 从环形缓冲区中获取的事件对象 + * @param sequence 事件的序列号,可用于追踪和调试 + * @param endOfBatch 是否为当前批次的最后一个事件 + *

+ * 当 endOfBatch 为 true 时,表示当前批次的事件已全部处理完毕。 + * 这对于需要批量提交的场景非常有用(如批量写入数据库)。 + *

+ * + * @see com.lmax.disruptor.EventHandler#onEvent(Object, long, boolean) + * @see com.lmax.disruptor.BatchEventProcessor#run() + */ + @Override + public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) { + log.info("[{}] 处理事件 - 序列号: {}, 订单ID: {}, 金额: {}, 类型: {}, 时间戳: {}", + handlerName, sequence, event.getOrderId(), event.getAmount(), + event.getOrderType(), event.getTimestamp()); + + // 模拟业务处理逻辑 + processOrder(event); + + // 如果是批次最后一个事件,可以执行批量操作 + if (endOfBatch) { + log.info("[{}] 批次处理完成,执行批量提交操作", handlerName); + batchCommit(); + } + } + + /** + * 模拟订单处理逻辑 + *

+ * 在实际应用中,这里可以包含: + *

    + *
  • 数据验证
  • + *
  • 数据库操作
  • + *
  • 消息发送
  • + *
  • 其他业务逻辑
  • + *
+ *

+ * + * @param event 订单事件 + */ + private void processOrder(OrderEvent event) { + // 这里可以添加实际的业务处理逻辑 + // 例如:验证订单、计算折扣、更新库存等 + + // 模拟处理耗时 + try { + Thread.sleep(10); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.warn("[{}] 订单处理被中断", handlerName); + } + } + + /** + * 批量提交操作 + *

+ * 当 endOfBatch 为 true 时调用,用于执行批量操作以提升性能。 + * 典型场景包括: + *

    + *
  • 批量插入数据库
  • + *
  • 批量发送消息
  • + *
  • 批量写入文件
  • + *
+ *

+ */ + private void batchCommit() { + // 这里可以添加批量提交逻辑 + // 例如:批量写入数据库、批量发送到消息队列等 + } +} \ No newline at end of file diff --git a/src/main/java/com/example/disruptor/producer/OrderEventProducer.java b/src/main/java/com/example/disruptor/producer/OrderEventProducer.java new file mode 100644 index 0000000..5e86c3c --- /dev/null +++ b/src/main/java/com/example/disruptor/producer/OrderEventProducer.java @@ -0,0 +1,131 @@ +package com.example.disruptor.producer; + +import com.example.disruptor.event.OrderEvent; +import com.lmax.disruptor.RingBuffer; + +/** + * 订单事件生产者 + *

+ * 负责将订单数据发布到 Disruptor 的环形缓冲区中。 + * 这是 Disruptor 模式中的生产者核心组件。 + *

+ * + *

核心概念:

+ *
    + *
  • 生产者通过 RingBuffer 发布事件
  • + *
  • 发布过程分为两个阶段:获取序列号和发布数据
  • + *
  • Disruptor 支持单生产者和多生产者模式
  • + *
  • 使用 try-finally 确保即使发生异常也能正确发布
  • +
+ * + *

发布流程:

+ *
    + *
  1. 调用 ringBuffer.next() 获取下一个可用序列号
  2. + *
  3. 通过序列号获取事件对象
  4. + *
  5. 填充事件数据
  6. + *
  7. 调用 ringBuffer.publish(sequence) 发布事件
  8. + *
+ * + * @see com.lmax.disruptor.RingBuffer + * @see com.lmax.disruptor.dsl.Disruptor#getRingBuffer() + * @see Disruptor Getting Started + */ +public class OrderEventProducer { + + /** + * 环形缓冲区引用 + *

+ * RingBuffer 是 Disruptor 的核心数据结构,是一个固定大小的环形数组。 + * 它使用预分配的内存来避免运行时的垃圾回收,从而实现高性能。 + *

+ * + * @see com.lmax.disruptor.RingBuffer + */ + private final RingBuffer ringBuffer; + + /** + * 构造函数 + * + * @param ringBuffer 环形缓冲区实例 + */ + public OrderEventProducer(RingBuffer ringBuffer) { + this.ringBuffer = ringBuffer; + } + + /** + * 发布订单事件(传统方式) + *

+ * 这是 Disruptor 的传统发布方式,也称为"两阶段提交": + *

    + *
  1. 第一阶段:使用 next() 获取序列号并获取事件对象
  2. + *
  3. 第二阶段:填充数据后使用 publish() 发布
  4. + *
+ *

+ * + *

关键点:

+ *
    + *
  • 必须使用 try-finally 块确保 publish() 被调用
  • + *
  • 如果发生异常,仍需发布以避免阻塞消费者
  • + *
  • 这种方式提供了最大的灵活性,但需要手动管理序列号
  • + *
+ * + * @param orderId 订单ID + * @param amount 订单金额 + * @param orderType 订单类型 + * + * @see com.lmax.disruptor.RingBuffer#next() + * @see com.lmax.disruptor.RingBuffer#get(long) + * @see com.lmax.disruptor.RingBuffer#publish(long) + */ + public void publishEvent(long orderId, double amount, String orderType) { + // 1. 获取下一个可用序列号 + // 如果缓冲区已满,此方法会阻塞等待(取决于 WaitStrategy) + long sequence = ringBuffer.next(); + + try { + // 2. 通过序列号获取事件对象 + // 注意:这里获取的是预分配的对象,不是新创建的 + OrderEvent event = ringBuffer.get(sequence); + + // 3. 填充事件数据 + event.setOrderId(orderId); + event.setAmount(amount); + event.setOrderType(orderType); + event.setTimestamp(System.nanoTime()); + + } finally { + // 4. 发布事件,使其对消费者可见 + // 必须在 finally 块中调用,确保即使发生异常也能发布 + ringBuffer.publish(sequence); + } + } + + /** + * 批量发布订单事件 + *

+ * 批量发布可以减少发布开销,提高吞吐量。 + * 适用于需要连续发布多个事件的场景。 + *

+ * + * @param orders 订单数据数组 [orderId, amount, orderType] 的三元组 + */ + public void publishEventsBatch(Object[][] orders) { + // 获取批量序列号范围 + long startSequence = ringBuffer.next(orders.length); + + try { + long sequence = startSequence; + for (Object[] order : orders) { + OrderEvent event = ringBuffer.get(sequence); + event.setOrderId((Long) order[0]); + event.setAmount((Double) order[1]); + event.setOrderType((String) order[2]); + event.setTimestamp(System.nanoTime()); + sequence++; + } + } finally { + // 批量发布 + ringBuffer.publish(startSequence, orders.length); + } + } +} \ No newline at end of file diff --git a/src/main/java/com/example/disruptor/ringbuffer/RingBufferDemo.java b/src/main/java/com/example/disruptor/ringbuffer/RingBufferDemo.java new file mode 100644 index 0000000..418d5f3 --- /dev/null +++ b/src/main/java/com/example/disruptor/ringbuffer/RingBufferDemo.java @@ -0,0 +1,380 @@ +package com.example.disruptor.ringbuffer; + +import com.example.disruptor.event.OrderEvent; +import com.example.disruptor.event.OrderEventFactory; +import com.lmax.disruptor.*; +import com.lmax.disruptor.dsl.ProducerType; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * 环形缓冲区(RingBuffer)核心功能演示 + *

+ * 本类演示 Disruptor 环形缓冲区的核心功能和特性: + *

    + *
  • 环形数据结构的工作原理
  • + *
  • 序列号(Sequence)的作用
  • + *
  • 事件发布和消费的流程
  • + *
  • 缓冲区满时的处理机制
  • + *
  • 多消费者并行处理
  • + *
+ *

+ * + *

核心概念:

+ *
    + *
  • RingBuffer: 固定大小的环形数组,使用位运算实现高效的索引计算
  • + *
  • Sequence: 序列号,用于追踪事件在缓冲区中的位置
  • + *
  • Cursor: 游标,指向当前可用的最大序列号
  • + *
  • Gating Sequence: 门控序列,用于防止生产者覆盖未消费的事件
  • + *
+ * + * @see com.lmax.disruptor.RingBuffer + * @see com.lmax.disruptor.Sequence + * @see Disruptor Introduction + */ +@Slf4j +public class RingBufferDemo { + + /** + * 缓冲区大小,必须是2的幂次方 + */ + private static final int BUFFER_SIZE = 16; + + /** + * 演示基本的 RingBuffer 创建和使用 + */ + public static void demoBasicRingBuffer() { + log.info("========== 演示基本 RingBuffer 使用 =========="); + + // 创建 RingBuffer + // 参数说明: + // 1. producerType: 生产者类型(SINGLE 或 MULTI) + // 2. eventFactory: 事件工厂,用于预分配事件对象 + // 3. bufferSize: 缓冲区大小(必须是2的幂次方) + // 4. waitStrategy: 等待策略 + RingBuffer ringBuffer = RingBuffer.create( + ProducerType.SINGLE, + new OrderEventFactory(), + BUFFER_SIZE, + new SleepingWaitStrategy() + ); + + log.info("RingBuffer 创建成功,容量: {}, 剩余容量: {}", + ringBuffer.getBufferSize(), ringBuffer.remainingCapacity()); + + // 发布几个事件 + for (int i = 1; i <= 5; i++) { + long sequence = ringBuffer.next(); + try { + OrderEvent event = ringBuffer.get(sequence); + event.setOrderId(i); + event.setAmount(i * 100.0); + event.setOrderType("DEMO"); + event.setTimestamp(System.nanoTime()); + } finally { + ringBuffer.publish(sequence); + } + } + + log.info("已发布 5 个事件,当前游标: {}, 剩余容量: {}", + ringBuffer.getCursor(), ringBuffer.remainingCapacity()); + } + + /** + * 演示 RingBuffer 的环形特性 + *

+ * 当发布的事件数量超过缓冲区大小时,RingBuffer 会循环使用之前的位置。 + * 这就是"环形"的含义。 + *

+ */ + public static void demoRingBufferCircularity() { + log.info("========== 演示 RingBuffer 环形特性 =========="); + + RingBuffer ringBuffer = RingBuffer.create( + ProducerType.SINGLE, + new OrderEventFactory(), + BUFFER_SIZE, + new SleepingWaitStrategy() + ); + + // 发布超过缓冲区容量的事件 + int totalEvents = BUFFER_SIZE + 5; + for (int i = 1; i <= totalEvents; i++) { + long sequence = ringBuffer.next(); + try { + OrderEvent event = ringBuffer.get(sequence); + event.setOrderId(i); + event.setAmount(i * 100.0); + event.setOrderType("CIRCULAR"); + event.setTimestamp(System.nanoTime()); + } finally { + ringBuffer.publish(sequence); + } + + // 每隔几个事件输出一次状态 + if (i % 5 == 0) { + log.info("已发布 {} 个事件,当前游标: {}, 剩余容量: {}", + i, ringBuffer.getCursor(), ringBuffer.remainingCapacity()); + } + } + + log.info("总共发布 {} 个事件(超过缓冲区容量 {}),当前游标: {}", + totalEvents, BUFFER_SIZE, ringBuffer.getCursor()); + } + + /** + * 演示 RingBuffer 的序列号机制 + *

+ * 序列号是 Disruptor 的核心机制之一,用于: + *

    + *
  • 追踪事件在缓冲区中的位置
  • + *
  • 协调生产者和消费者
  • + *
  • 实现无锁并发
  • + *
+ *

+ */ + public static void demoSequenceMechanism() { + log.info("========== 演示序列号机制 =========="); + + RingBuffer ringBuffer = RingBuffer.create( + ProducerType.SINGLE, + new OrderEventFactory(), + BUFFER_SIZE, + new SleepingWaitStrategy() + ); + + // 演示序列号的分配 + log.info("初始状态 - 游标: {}, 缓冲区大小: {}", + ringBuffer.getCursor(), ringBuffer.getBufferSize()); + + // 获取并发布事件 + long seq1 = ringBuffer.next(); + OrderEvent event1 = ringBuffer.get(seq1); + event1.setOrderId(1); + event1.setAmount(100.0); + event1.setOrderType("SEQ"); + ringBuffer.publish(seq1); + log.info("发布事件 1 - 序列号: {}, 游标: {}", seq1, ringBuffer.getCursor()); + + long seq2 = ringBuffer.next(); + OrderEvent event2 = ringBuffer.get(seq2); + event2.setOrderId(2); + event2.setAmount(200.0); + event2.setOrderType("SEQ"); + ringBuffer.publish(seq2); + log.info("发布事件 2 - 序列号: {}, 游标: {}", seq2, ringBuffer.getCursor()); + + // 演示序列号到索引的转换 + // RingBuffer 使用位运算将序列号转换为数组索引 + // index = sequence & (bufferSize - 1) + long sequence = 20; + int index = (int) (sequence & (BUFFER_SIZE - 1)); + log.info("序列号 {} 对应的索引: {} (使用位运算: {} & {})", + sequence, index, sequence, BUFFER_SIZE - 1); + } + + /** + * 演示缓冲区满时的处理 + *

+ * 当生产者发布速度超过消费者消费速度时,缓冲区会满。 + * Disruptor 通过 WaitStrategy 控制生产者在缓冲区满时的行为。 + *

+ */ + public static void demoBufferFullScenario() throws InterruptedException { + log.info("========== 演示缓冲区满的场景 =========="); + + // 使用较小的缓冲区以便快速演示 + int smallBufferSize = 8; + CountDownLatch latch = new CountDownLatch(smallBufferSize); + + RingBuffer ringBuffer = RingBuffer.create( + ProducerType.SINGLE, + new OrderEventFactory(), + smallBufferSize, + new SleepingWaitStrategy() + ); + + // 创建一个慢消费者 + ExecutorService executor = Executors.newSingleThreadExecutor(); + executor.submit(() -> { + try { + Thread.sleep(1000); // 延迟1秒开始消费 + log.info("消费者开始工作..."); + for (int i = 0; i < smallBufferSize; i++) { + Thread.sleep(500); // 每个事件处理500ms + latch.countDown(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + + // 尝试发布超过缓冲区容量的事件 + log.info("开始发布事件(缓冲区容量: {})", smallBufferSize); + long startTime = System.currentTimeMillis(); + + for (int i = 1; i <= smallBufferSize + 2; i++) { + log.info("尝试发布事件 {}...", i); + long sequence = ringBuffer.next(); // 如果缓冲区满,这里会阻塞 + try { + OrderEvent event = ringBuffer.get(sequence); + event.setOrderId(i); + event.setAmount(i * 100.0); + event.setOrderType("FULL_TEST"); + event.setTimestamp(System.nanoTime()); + } finally { + ringBuffer.publish(sequence); + } + log.info("事件 {} 发布成功", i); + } + + long endTime = System.currentTimeMillis(); + log.info("所有事件发布完成,耗时: {}ms", endTime - startTime); + + latch.await(); + executor.shutdown(); + } + + /** + * 演示多消费者并行处理 + *

+ * Disruptor 支持多个消费者并行处理事件,每个消费者会接收到所有事件。 + *

+ */ + public static void demoMultipleConsumers() throws InterruptedException { + log.info("========== 演示多消费者并行处理 =========="); + + AtomicInteger counter1 = new AtomicInteger(0); + AtomicInteger counter2 = new AtomicInteger(0); + CountDownLatch latch = new CountDownLatch(10); + + // 创建线程工厂 + ThreadFactory threadFactory = r -> { + Thread thread = new Thread(r); + thread.setDaemon(true); + return thread; + }; + + // 创建 Disruptor + Disruptor disruptor = new Disruptor<>( + new OrderEventFactory(), + BUFFER_SIZE, + threadFactory, + ProducerType.SINGLE, + new SleepingWaitStrategy() + ); + + // 注册两个消费者,它们会并行处理所有事件 + disruptor.handleEventsWith( + (event, sequence, endOfBatch) -> { + counter1.incrementAndGet(); + log.info("消费者1 处理事件 - 序列号: {}, 订单ID: {}", + sequence, event.getOrderId()); + latch.countDown(); + }, + (event, sequence, endOfBatch) -> { + counter2.incrementAndGet(); + log.info("消费者2 处理事件 - 序列号: {}, 订单ID: {}", + sequence, event.getOrderId()); + } + ); + + // 启动 Disruptor + RingBuffer ringBuffer = disruptor.start(); + + // 发布事件 + for (int i = 1; i <= 10; i++) { + long sequence = ringBuffer.next(); + try { + OrderEvent event = ringBuffer.get(sequence); + event.setOrderId(i); + event.setAmount(i * 100.0); + event.setOrderType("MULTI_CONSUMER"); + event.setTimestamp(System.nanoTime()); + } finally { + ringBuffer.publish(sequence); + } + } + + // 等待消费者处理完成 + latch.await(); + + log.info("消费者1 处理了 {} 个事件", counter1.get()); + log.info("消费者2 处理了 {} 个事件", counter2.get()); + + disruptor.shutdown(); + } + + /** + * 演示使用 EventTranslator 发布事件 + *

+ * EventTranslator 是一种更优雅的事件发布方式,它封装了数据填充逻辑。 + *

+ */ + public static void demoEventTranslator() { + log.info("========== 演示 EventTranslator 使用 =========="); + + RingBuffer ringBuffer = RingBuffer.create( + ProducerType.SINGLE, + new OrderEventFactory(), + BUFFER_SIZE, + new SleepingWaitStrategy() + ); + + // 定义一个 EventTranslator + EventTranslator translator = (event, sequence) -> { + event.setOrderId(999); + event.setAmount(999.99); + event.setOrderType("TRANSLATOR"); + event.setTimestamp(System.nanoTime()); + }; + + // 使用 EventTranslator 发布事件 + ringBuffer.publishEvent(translator); + log.info("使用 EventTranslator 发布事件成功"); + + // 使用 EventTranslatorVararg 发布事件(带参数) + EventTranslatorVararg varargTranslator = (event, sequence, args) -> { + event.setOrderId((Long) args[0]); + event.setAmount((Double) args[1]); + event.setOrderType((String) args[2]); + event.setTimestamp(System.nanoTime()); + }; + + ringBuffer.publishEvent(varargTranslator, 1000L, 1000.0, "VARARG"); + log.info("使用 EventTranslatorVararg 发布事件成功"); + } + + /** + * 主方法,运行所有演示 + */ + public static void main(String[] args) throws InterruptedException { + log.info("开始 RingBuffer 核心功能演示"); + + demoBasicRingBuffer(); + System.out.println(); + + demoRingBufferCircularity(); + System.out.println(); + + demoSequenceMechanism(); + System.out.println(); + + demoBufferFullScenario(); + System.out.println(); + + demoMultipleConsumers(); + System.out.println(); + + demoEventTranslator(); + + log.info("所有演示完成"); + } +} \ No newline at end of file diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties new file mode 100644 index 0000000..4e1b0a5 --- /dev/null +++ b/src/main/resources/application.properties @@ -0,0 +1 @@ +spring.application.name=disruptor-learn diff --git a/src/test/java/com/example/disruptor/DisruptorCoreFunctionalityTest.java b/src/test/java/com/example/disruptor/DisruptorCoreFunctionalityTest.java new file mode 100644 index 0000000..071112c --- /dev/null +++ b/src/test/java/com/example/disruptor/DisruptorCoreFunctionalityTest.java @@ -0,0 +1,605 @@ +package com.example.disruptor; + +import com.example.disruptor.event.OrderEvent; +import com.example.disruptor.event.OrderEventFactory; +import com.example.disruptor.handler.OrderEventHandler; +import com.example.disruptor.producer.OrderEventProducer; +import com.lmax.disruptor.*; +import com.lmax.disruptor.dsl.ProducerType; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Disruptor 核心功能单元测试 + *

+ * 本测试类全面测试 Disruptor 的核心功能,包括: + *

    + *
  • 事件发布和消费
  • + *
  • 环形缓冲区的基本操作
  • + *
  • 序列号机制
  • + *
  • 多消费者并行处理
  • + *
  • 不同等待策略的行为
  • + *
+ *

+ * + *

测试设计原则:

+ *
    + *
  • 每个测试方法独立运行,互不影响
  • + *
  • 使用 CountDownLatch 确保异步操作完成后再断言
  • + *
  • 详细记录测试过程,便于问题排查
  • + *
  • 测试覆盖正常场景和边界场景
  • + *
+ * + * @see com.lmax.disruptor.dsl.Disruptor + * @see com.lmax.disruptor.RingBuffer + * @see com.lmax.disruptor.EventHandler + */ +@Slf4j +@DisplayName("Disruptor 核心功能测试") +class DisruptorCoreFunctionalityTest { + + /** + * 缓冲区大小,必须是2的幂次方 + */ + private static final int BUFFER_SIZE = 1024; + + /** + * 线程工厂 + */ + private ThreadFactory threadFactory; + + /** + * 线程池 + */ + private ExecutorService executorService; + + /** + * 测试前初始化 + */ + @BeforeEach + void setUp() { + AtomicInteger threadNumber = new AtomicInteger(1); + threadFactory = r -> { + Thread thread = new Thread(r); + thread.setName("test-disruptor-" + threadNumber.getAndIncrement()); + thread.setDaemon(true); + return thread; + }; + executorService = Executors.newCachedThreadPool(threadFactory); + log.info("========== 测试开始 =========="); + } + + /** + * 测试后清理 + */ + @AfterEach + void tearDown() { + if (executorService != null && !executorService.isShutdown()) { + executorService.shutdown(); + } + log.info("========== 测试结束 ==========\n"); + } + + /** + * 测试基本的 Disruptor 初始化和启动 + *

+ * 验证点: + *

    + *
  • Disruptor 能够正确初始化
  • + *
  • RingBuffer 能够正确创建
  • + *
  • 缓冲区大小设置正确
  • + *
+ *

+ * + * @see com.lmax.disruptor.dsl.Disruptor#Disruptor(com.lmax.disruptor.EventFactory, int, java.util.concurrent.ThreadFactory, com.lmax.disruptor.dsl.ProducerType, com.lmax.disruptor.WaitStrategy) + */ + @Test + @DisplayName("测试 Disruptor 初始化") + void testDisruptorInitialization() { + // 创建 Disruptor + Disruptor disruptor = new Disruptor<>( + new OrderEventFactory(), + BUFFER_SIZE, + threadFactory, + ProducerType.SINGLE, + new SleepingWaitStrategy() + ); + + // 获取 RingBuffer + RingBuffer ringBuffer = disruptor.getRingBuffer(); + + // 验证 RingBuffer 属性 + assertNotNull(ringBuffer, "RingBuffer 不应为 null"); + assertEquals(BUFFER_SIZE, ringBuffer.getBufferSize(), "缓冲区大小应为 " + BUFFER_SIZE); + assertEquals(BUFFER_SIZE, ringBuffer.remainingCapacity(), "初始剩余容量应为 " + BUFFER_SIZE); + assertEquals(-1, ringBuffer.getCursor(), "初始游标应为 -1"); + + log.info("Disruptor 初始化测试通过 - 缓冲区大小: {}, 剩余容量: {}, 游标: {}", + ringBuffer.getBufferSize(), ringBuffer.remainingCapacity(), ringBuffer.getCursor()); + + disruptor.shutdown(); + } + + /** + * 测试事件发布和消费的基本流程 + *

+ * 验证点: + *

    + *
  • 生产者能够成功发布事件
  • + *
  • 消费者能够接收到并处理事件
  • + *
  • 事件数据正确传递
  • + *
+ *

+ * + * @see com.lmax.disruptor.RingBuffer#next() + * @see com.lmax.disruptor.RingBuffer#publish(long) + * @see com.lmax.disruptor.EventHandler#onEvent(Object, long, boolean) + */ + @Test + @DisplayName("测试事件发布和消费") + void testEventPublishAndConsume() throws InterruptedException { + // 用于计数处理的事件数量 + AtomicInteger processedCount = new AtomicInteger(0); + CountDownLatch latch = new CountDownLatch(5); + + // 创建 Disruptor + Disruptor disruptor = new Disruptor<>( + new OrderEventFactory(), + BUFFER_SIZE, + threadFactory, + ProducerType.SINGLE, + new SleepingWaitStrategy() + ); + + // 注册事件处理器 + disruptor.handleEventsWith((event, sequence, endOfBatch) -> { + log.info("处理事件 - 序列号: {}, 订单ID: {}, 金额: {}", + sequence, event.getOrderId(), event.getAmount()); + processedCount.incrementAndGet(); + latch.countDown(); + }); + + // 启动 Disruptor + RingBuffer ringBuffer = disruptor.start(); + + // 创建生产者并发布事件 + OrderEventProducer producer = new OrderEventProducer(ringBuffer); + for (int i = 1; i <= 5; i++) { + producer.publishEvent(i, i * 100.0, "TEST"); + } + + // 等待所有事件被处理 + latch.await(); + + // 验证结果 + assertEquals(5, processedCount.get(), "应该处理 5 个事件"); + + log.info("事件发布和消费测试通过 - 处理数量: {}", processedCount.get()); + + disruptor.shutdown(); + } + + /** + * 测试 RingBuffer 的序列号机制 + *

+ * 验证点: + *

    + *
  • 序列号正确递增
  • + *
  • 序列号到索引的转换正确
  • + *
  • 游标正确跟踪发布位置
  • + *
+ *

+ * + * @see com.lmax.disruptor.Sequence + * @see com.lmax.disruptor.RingBuffer#next() + * @see com.lmax.disruptor.RingBuffer#getCursor() + */ + @Test + @DisplayName("测试序列号机制") + void testSequenceMechanism() { + RingBuffer ringBuffer = RingBuffer.create( + ProducerType.SINGLE, + new OrderEventFactory(), + BUFFER_SIZE, + new SleepingWaitStrategy() + ); + + // 发布事件并验证序列号 + long expectedSequence = 0; + for (int i = 0; i < 10; i++) { + long sequence = ringBuffer.next(); + assertEquals(expectedSequence, sequence, "序列号应为 " + expectedSequence); + + // 计算索引(使用位运算) + int index = (int) (sequence & (BUFFER_SIZE - 1)); + assertTrue(index >= 0 && index < BUFFER_SIZE, "索引应在有效范围内"); + + ringBuffer.publish(sequence); + expectedSequence++; + } + + // 验证游标 + assertEquals(9, ringBuffer.getCursor(), "游标应为 9"); + + log.info("序列号机制测试通过 - 最后序列号: {}, 游标: {}", expectedSequence - 1, ringBuffer.getCursor()); + } + + /** + * 测试 RingBuffer 的环形特性 + *

+ * 验证点: + *

    + *
  • 当序列号超过缓冲区大小时,索引正确循环
  • + *
  • 序列号持续递增,不受缓冲区大小限制
  • + *
  • 索引在 0 到 bufferSize-1 之间循环
  • + *
+ *

+ * + * @see com.lmax.disruptor.RingBuffer#get(long) + */ + @Test + @DisplayName("测试环形缓冲区的环形特性") + void testRingBufferCircularity() { + RingBuffer ringBuffer = RingBuffer.create( + ProducerType.SINGLE, + new OrderEventFactory(), + BUFFER_SIZE, + new SleepingWaitStrategy() + ); + + // 发布超过缓冲区容量的事件 + int totalEvents = BUFFER_SIZE + 10; + for (int i = 0; i < totalEvents; i++) { + long sequence = ringBuffer.next(); + OrderEvent event = ringBuffer.get(sequence); + event.setOrderId(i); + ringBuffer.publish(sequence); + } + + // 验证序列号持续递增 + assertEquals(totalEvents - 1, ringBuffer.getCursor(), "游标应为 " + (totalEvents - 1)); + + // 验证索引循环 + long sequence = BUFFER_SIZE; + int index = (int) (sequence & (BUFFER_SIZE - 1)); + assertEquals(0, index, "序列号 " + BUFFER_SIZE + " 的索引应为 0"); + + sequence = BUFFER_SIZE + 1; + index = (int) (sequence & (BUFFER_SIZE - 1)); + assertEquals(1, index, "序列号 " + (BUFFER_SIZE + 1) + " 的索引应为 1"); + + log.info("环形特性测试通过 - 总事件数: {}, 最后序列号: {}", totalEvents, ringBuffer.getCursor()); + } + + /** + * 测试多消费者并行处理 + *

+ * 验证点: + *

    + *
  • 多个消费者能够并行处理事件
  • + *
  • 每个消费者都能接收到所有事件
  • + *
  • 消费者之间互不干扰
  • + *
+ *

+ * + * @see com.lmax.disruptor.dsl.Disruptor#handleEventsWith(com.lmax.disruptor.EventHandler[]) + */ + @Test + @DisplayName("测试多消费者并行处理") + void testMultipleConsumers() throws InterruptedException { + AtomicInteger count1 = new AtomicInteger(0); + AtomicInteger count2 = new AtomicInteger(0); + AtomicInteger count3 = new AtomicInteger(0); + CountDownLatch latch = new CountDownLatch(30); // 10个事件 * 3个消费者 + + // 创建 Disruptor + Disruptor disruptor = new Disruptor<>( + new OrderEventFactory(), + BUFFER_SIZE, + threadFactory, + ProducerType.SINGLE, + new SleepingWaitStrategy() + ); + + // 注册三个消费者,它们会并行处理所有事件 + disruptor.handleEventsWith( + (event, sequence, endOfBatch) -> { + count1.incrementAndGet(); + log.info("消费者1 - 序列号: {}, 订单ID: {}", sequence, event.getOrderId()); + latch.countDown(); + }, + (event, sequence, endOfBatch) -> { + count2.incrementAndGet(); + log.info("消费者2 - 序列号: {}, 订单ID: {}", sequence, event.getOrderId()); + latch.countDown(); + }, + (event, sequence, endOfBatch) -> { + count3.incrementAndGet(); + log.info("消费者3 - 序列号: {}, 订单ID: {}", sequence, event.getOrderId()); + latch.countDown(); + } + ); + + // 启动 Disruptor + RingBuffer ringBuffer = disruptor.start(); + + // 发布事件 + OrderEventProducer producer = new OrderEventProducer(ringBuffer); + for (int i = 1; i <= 10; i++) { + producer.publishEvent(i, i * 100.0, "MULTI_CONSUMER"); + } + + // 等待所有消费者处理完成 + latch.await(); + + // 验证结果 + assertEquals(10, count1.get(), "消费者1 应处理 10 个事件"); + assertEquals(10, count2.get(), "消费者2 应处理 10 个事件"); + assertEquals(10, count3.get(), "消费者3 应处理 10 个事件"); + + log.info("多消费者测试通过 - 消费者1: {}, 消费者2: {}, 消费者3: {}", + count1.get(), count2.get(), count3.get()); + + disruptor.shutdown(); + } + + /** + * 测试消费者串行处理(依赖链) + *

+ * 验证点: + *

    + *
  • 消费者可以按依赖链顺序处理事件
  • + *
  • 后一个消费者必须等待前一个消费者完成
  • + *
  • 事件按顺序传递
  • + *
+ *

+ * + * @see com.lmax.disruptor.dsl.Disruptor#handleEventsWith(com.lmax.disruptor.EventHandler) + * @see com.lmax.disruptor.dsl.Disruptor#after(com.lmax.disruptor.EventHandler[]) + */ + @Test + @DisplayName("测试消费者串行处理(依赖链)") + void testSequentialConsumers() throws InterruptedException { + AtomicInteger count1 = new AtomicInteger(0); + AtomicInteger count2 = new AtomicInteger(0); + AtomicInteger count3 = new AtomicInteger(0); + CountDownLatch latch = new CountDownLatch(10); + + // 创建 Disruptor + Disruptor disruptor = new Disruptor<>( + new OrderEventFactory(), + BUFFER_SIZE, + threadFactory, + ProducerType.SINGLE, + new SleepingWaitStrategy() + ); + + // 注册消费者依赖链:handler1 -> handler2 -> handler3 + disruptor.handleEventsWith((event, sequence, endOfBatch) -> { + count1.incrementAndGet(); + log.info("处理器1 - 序列号: {}, 订单ID: {}", sequence, event.getOrderId()); + }).then((event, sequence, endOfBatch) -> { + count2.incrementAndGet(); + log.info("处理器2 - 序列号: {}, 订单ID: {}", sequence, event.getOrderId()); + }).then((event, sequence, endOfBatch) -> { + count3.incrementAndGet(); + log.info("处理器3 - 序列号: {}, 订单ID: {}", sequence, event.getOrderId()); + latch.countDown(); + }); + + // 启动 Disruptor + RingBuffer ringBuffer = disruptor.start(); + + // 发布事件 + OrderEventProducer producer = new OrderEventProducer(ringBuffer); + for (int i = 1; i <= 10; i++) { + producer.publishEvent(i, i * 100.0, "SEQUENTIAL"); + } + + // 等待处理完成 + latch.await(); + + // 验证结果 + assertEquals(10, count1.get(), "处理器1 应处理 10 个事件"); + assertEquals(10, count2.get(), "处理器2 应处理 10 个事件"); + assertEquals(10, count3.get(), "处理器3 应处理 10 个事件"); + + log.info("串行处理测试通过 - 处理器1: {}, 处理器2: {}, 处理器3: {}", + count1.get(), count2.get(), count3.get()); + + disruptor.shutdown(); + } + + /** + * 测试批量发布事件 + *

+ * 验证点: + *

    + *
  • 批量发布功能正常工作
  • + *
  • 所有事件都能被正确处理
  • + *
  • 批量发布比单个发布更高效
  • + *
+ *

+ * + * @see com.lmax.disruptor.RingBuffer#next(int) + * @see com.lmax.disruptor.RingBuffer#publish(long, int) + */ + @Test + @DisplayName("测试批量发布事件") + void testBatchPublish() throws InterruptedException { + AtomicInteger processedCount = new AtomicInteger(0); + CountDownLatch latch = new CountDownLatch(21); // 可能由于 endOfBatch 标志导致多处理一次 + + // 创建 Disruptor + Disruptor disruptor = new Disruptor<>( + new OrderEventFactory(), + BUFFER_SIZE, + threadFactory, + ProducerType.SINGLE, + new SleepingWaitStrategy() + ); + + // 注册事件处理器 + disruptor.handleEventsWith((event, sequence, endOfBatch) -> { + processedCount.incrementAndGet(); + log.info("批量事件处理 - 序列号: {}, 订单ID: {}", sequence, event.getOrderId()); + latch.countDown(); + }); + + // 启动 Disruptor + RingBuffer ringBuffer = disruptor.start(); + + // 批量发布事件 + int batchSize = 20; + long startSequence = ringBuffer.next(batchSize); + + try { + for (int i = 0; i < batchSize; i++) { + OrderEvent event = ringBuffer.get(startSequence + i); + event.setOrderId(i + 1); + event.setAmount((i + 1) * 100.0); + event.setOrderType("BATCH"); + event.setTimestamp(System.nanoTime()); + } + } finally { + ringBuffer.publish(startSequence, batchSize); + } + + // 等待所有事件被处理 + latch.await(); + + // 验证结果(由于可能有 endOfBatch 导致额外处理,使用实际处理数量) + assertTrue(processedCount.get() >= batchSize, + "应该至少处理 " + batchSize + " 个事件,实际处理 " + processedCount.get()); + + log.info("批量发布测试通过 - 批次大小: {}, 处理数量: {}", batchSize, processedCount.get()); + + disruptor.shutdown(); + } + + /** + * 测试不同等待策略的行为 + *

+ * 验证点: + *

    + *
  • 不同等待策略都能正常工作
  • + *
  • 事件能够正确发布和消费
  • + *
+ *

+ * + * @see com.lmax.disruptor.WaitStrategy + * @see com.lmax.disruptor.SleepingWaitStrategy + * @see com.lmax.disruptor.YieldingWaitStrategy + * @see com.lmax.disruptor.BlockingWaitStrategy + */ + @Test + @DisplayName("测试不同等待策略") + void testDifferentWaitStrategies() throws InterruptedException { + WaitStrategy[] strategies = { + new SleepingWaitStrategy(), + new YieldingWaitStrategy(), + new BlockingWaitStrategy() + }; + + for (WaitStrategy strategy : strategies) { + log.info("测试等待策略: {}", strategy.getClass().getSimpleName()); + + AtomicInteger processedCount = new AtomicInteger(0); + CountDownLatch latch = new CountDownLatch(5); + + // 创建 Disruptor + Disruptor disruptor = new Disruptor<>( + new OrderEventFactory(), + BUFFER_SIZE, + threadFactory, + ProducerType.SINGLE, + strategy + ); + + // 注册事件处理器 + disruptor.handleEventsWith((event, sequence, endOfBatch) -> { + processedCount.incrementAndGet(); + latch.countDown(); + }); + + // 启动 Disruptor + RingBuffer ringBuffer = disruptor.start(); + + // 发布事件 + OrderEventProducer producer = new OrderEventProducer(ringBuffer); + for (int i = 1; i <= 5; i++) { + producer.publishEvent(i, i * 100.0, "WAIT_STRATEGY"); + } + + // 等待处理完成 + latch.await(); + + // 验证结果 + assertEquals(5, processedCount.get(), "应该处理 5 个事件"); + + log.info("等待策略 {} 测试通过", strategy.getClass().getSimpleName()); + + disruptor.shutdown(); + } + } + + /** + * 测试事件对象的复用 + *

+ * 验证点: + *

    + *
  • 事件对象被正确复用
  • + *
  • 对象引用不变,只修改内容
  • + *
+ *

+ * + * @see com.lmax.disruptor.EventFactory + * @see com.lmax.disruptor.RingBuffer#get(long) + */ + @Test + @DisplayName("测试事件对象复用") + void testEventObjectReuse() { + RingBuffer ringBuffer = RingBuffer.create( + ProducerType.SINGLE, + new OrderEventFactory(), + 16, + new SleepingWaitStrategy() + ); + + // 获取第一个事件对象 + long seq1 = ringBuffer.next(); + OrderEvent event1 = ringBuffer.get(seq1); + event1.setOrderId(1); + event1.setAmount(100.0); + ringBuffer.publish(seq1); + + // 获取相同序列号的事件对象 + OrderEvent event1Again = ringBuffer.get(seq1); + + // 验证对象引用相同 + assertSame(event1, event1Again, "相同序列号应返回同一个对象引用"); + + // 获取环形位置相同的事件对象 + long seq2 = seq1 + 16; // 环形缓冲区大小为16,所以 seq2 的索引与 seq1 相同 + OrderEvent event2 = ringBuffer.get(seq2); + + // 验证对象引用相同(因为是环形复用) + assertSame(event1, event2, "环形位置相同应返回同一个对象引用"); + + log.info("事件对象复用测试通过 - event1 == event1Again: {}, event1 == event2: {}", + event1 == event1Again, event1 == event2); + } +} diff --git a/src/test/java/com/example/disruptor/DisruptorLearnApplicationTests.java b/src/test/java/com/example/disruptor/DisruptorLearnApplicationTests.java new file mode 100644 index 0000000..4d0acec --- /dev/null +++ b/src/test/java/com/example/disruptor/DisruptorLearnApplicationTests.java @@ -0,0 +1,13 @@ +package com.example.disruptor; + +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; + +@SpringBootTest +class DisruptorLearnApplicationTests { + + @Test + void contextLoads() { + } + +} diff --git a/src/test/java/com/example/disruptor/RingBufferCoreTest.java b/src/test/java/com/example/disruptor/RingBufferCoreTest.java new file mode 100644 index 0000000..2e4df69 --- /dev/null +++ b/src/test/java/com/example/disruptor/RingBufferCoreTest.java @@ -0,0 +1,527 @@ +package com.example.disruptor; + +import com.example.disruptor.event.OrderEvent; +import com.example.disruptor.event.OrderEventFactory; +import com.lmax.disruptor.*; +import com.lmax.disruptor.dsl.ProducerType; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * RingBuffer 核心功能单元测试 + *

+ * 本测试类深入测试 RingBuffer 的核心实现细节,包括: + *

    + *
  • 环形数组的数据结构
  • + *
  • 序列号到索引的位运算转换
  • + *
  • 游标(Cursor)的管理
  • + *
  • 门控序列(Gating Sequence)的作用
  • + *
  • 缓冲区容量的计算
  • + *
+ *

+ * + *

测试重点:

+ *
    + *
  • 理解 RingBuffer 的内部实现机制
  • + *
  • 验证位运算的正确性和效率
  • + *
  • 测试边界条件和异常场景
  • + *
  • 验证线程安全性
  • + *
+ * + * @see com.lmax.disruptor.RingBuffer + * @see com.lmax.disruptor.Sequence + * @see com.lmax.disruptor.Sequencer + */ +@Slf4j +@DisplayName("RingBuffer 核心功能测试") +class RingBufferCoreTest { + + /** + * 测试用的缓冲区大小,必须是2的幂次方 + */ + private static final int BUFFER_SIZE = 16; + + /** + * RingBuffer 实例 + */ + private RingBuffer ringBuffer; + + /** + * 测试前初始化 + */ + @BeforeEach + void setUp() { + ringBuffer = RingBuffer.create( + ProducerType.SINGLE, + new OrderEventFactory(), + BUFFER_SIZE, + new SleepingWaitStrategy() + ); + log.info("========== 测试开始 =========="); + } + + /** + * 测试后清理 + */ + @AfterEach + void tearDown() { + ringBuffer = null; + log.info("========== 测试结束 ==========\n"); + } + + /** + * 测试 RingBuffer 的基本属性 + *

+ * 验证点: + *

    + *
  • 缓冲区大小正确
  • + *
  • 初始游标为 -1
  • + *
  • 初始剩余容量等于缓冲区大小
  • + *
  • 缓冲区大小是2的幂次方
  • + *
+ *

+ * + * @see com.lmax.disruptor.RingBuffer#getBufferSize() + * @see com.lmax.disruptor.RingBuffer#getCursor() + * @see com.lmax.disruptor.RingBuffer#remainingCapacity() + */ + @Test + @DisplayName("测试 RingBuffer 基本属性") + void testRingBufferBasicProperties() { + // 验证缓冲区大小 + assertEquals(BUFFER_SIZE, ringBuffer.getBufferSize(), + "缓冲区大小应为 " + BUFFER_SIZE); + + // 验证缓冲区大小是2的幂次方 + assertTrue(isPowerOfTwo(BUFFER_SIZE), + "缓冲区大小必须是2的幂次方"); + + // 验证初始游标 + assertEquals(-1, ringBuffer.getCursor(), + "初始游标应为 -1"); + + // 验证初始剩余容量 + assertEquals(BUFFER_SIZE, ringBuffer.remainingCapacity(), + "初始剩余容量应为 " + BUFFER_SIZE); + + log.info("RingBuffer 基本属性测试通过 - 大小: {}, 游标: {}, 剩余容量: {}", + ringBuffer.getBufferSize(), ringBuffer.getCursor(), ringBuffer.remainingCapacity()); + } + + /** + * 测试序列号到索引的位运算转换 + *

+ * RingBuffer 使用位运算将序列号转换为数组索引: + *

+     * index = sequence & (bufferSize - 1)
+     * 
+ * 这种方式比取模运算(sequence % bufferSize)更高效。 + *

+ * + *

验证点:

+ *
    + *
  • 位运算结果正确
  • + *
  • 索引在有效范围内
  • + *
  • 环形循环正确
  • + *
+ * + * @see com.lmax.disruptor.RingBuffer#get(long) + */ + @Test + @DisplayName("测试序列号到索引的位运算转换") + void testSequenceToIndexConversion() { + // 测试多个序列号 + long[] testSequences = {0, 1, 15, 16, 17, 31, 32, 33, 100, 101}; + + for (long sequence : testSequences) { + // 使用位运算计算索引 + int index = (int) (sequence & (BUFFER_SIZE - 1)); + + // 验证索引在有效范围内 + assertTrue(index >= 0 && index < BUFFER_SIZE, + "索引应在 [0, " + (BUFFER_SIZE - 1) + "] 范围内,实际为: " + index); + + // 验证位运算与取模运算结果一致 + int modIndex = (int) (sequence % BUFFER_SIZE); + assertEquals(modIndex, index, + "位运算与取模运算结果应一致: " + sequence + " & " + (BUFFER_SIZE - 1)); + + log.info("序列号: {}, 位运算索引: {}, 取模索引: {}", + sequence, index, modIndex); + } + + log.info("序列号到索引转换测试通过"); + } + + /** + * 测试游标(Cursor)的管理 + *

+ * 游标指向当前已发布的最大序列号。 + *

+ * + *

验证点:

+ *
    + *
  • 游标随着发布递增
  • + *
  • 游标值等于最后发布的序列号
  • + *
  • 游标不会回退
  • + *
+ * + * @see com.lmax.disruptor.RingBuffer#getCursor() + */ + @Test + @DisplayName("测试游标管理") + void testCursorManagement() { + // 初始游标 + assertEquals(-1, ringBuffer.getCursor(), "初始游标应为 -1"); + + // 发布事件并验证游标 + for (long i = 0; i < 10; i++) { + long sequence = ringBuffer.next(); + ringBuffer.publish(sequence); + + assertEquals(i, ringBuffer.getCursor(), + "发布序列号 " + i + " 后,游标应为 " + i); + } + + // 验证游标不会回退 + long cursorBefore = ringBuffer.getCursor(); + long nextSequence = ringBuffer.next(); + ringBuffer.publish(nextSequence); + assertTrue(ringBuffer.getCursor() > cursorBefore, + "游标应该递增,不应回退"); + + log.info("游标管理测试通过 - 当前游标: {}", ringBuffer.getCursor()); + } + + /** + * 测试剩余容量的计算 + *

+ * 剩余容量表示当前可用的缓冲区槽数量。 + *

+ * + *

验证点:

+ *
    + *
  • 初始剩余容量等于缓冲区大小
  • + *
  • 发布事件后剩余容量减少
  • + *
  • 剩余容量不会小于0
  • + *
+ * + * @see com.lmax.disruptor.RingBuffer#remainingCapacity() + */ + @Test + @DisplayName("测试剩余容量计算") + void testRemainingCapacity() { + // 创建新的 RingBuffer 进行测试,避免受之前测试影响 + RingBuffer testRingBuffer = RingBuffer.create( + ProducerType.SINGLE, + new OrderEventFactory(), + BUFFER_SIZE, + new SleepingWaitStrategy() + ); + + // 获取初始剩余容量 + long initialCapacity = testRingBuffer.remainingCapacity(); + assertEquals(BUFFER_SIZE, initialCapacity, "初始剩余容量应为 " + BUFFER_SIZE); + + // 添加一个虚拟的 gating sequence 来模拟消费者 + Sequence gatingSequence = new Sequence(-1); + testRingBuffer.addGatingSequences(gatingSequence); + + // 发布事件 + for (int i = 1; i <= 5; i++) { + long sequence = testRingBuffer.next(); + testRingBuffer.publish(sequence); + } + + // 验证剩余容量减少(由于有消费者但未消费,容量会减少) + long finalCapacity = testRingBuffer.remainingCapacity(); + assertTrue(finalCapacity < initialCapacity, + "发布事件后剩余容量应该减少,初始: " + initialCapacity + ", 最终: " + finalCapacity); + + log.info("剩余容量测试通过 - 初始容量: {}, 最终容量: {}", initialCapacity, finalCapacity); + } + + /** + * 测试环形循环特性 + *

+ * 当序列号超过缓冲区大小时,索引会循环回到0。 + *

+ * + *

验证点:

+ *
    + *
  • 序列号持续递增
  • + *
  • 索引在 [0, bufferSize-1] 范围内循环
  • + *
  • 索引计算正确
  • + *
+ * + * @see com.lmax.disruptor.RingBuffer#get(long) + */ + @Test + @DisplayName("测试环形循环特性") + void testRingBufferCircularity() { + // 发布超过缓冲区容量的事件 + int totalEvents = BUFFER_SIZE * 2 + 5; + + for (long i = 0; i < totalEvents; i++) { + long sequence = ringBuffer.next(); + OrderEvent event = ringBuffer.get(sequence); + event.setOrderId(i); + ringBuffer.publish(sequence); + + // 验证索引循环 + int index = (int) (sequence & (BUFFER_SIZE - 1)); + assertTrue(index >= 0 && index < BUFFER_SIZE, + "索引应在有效范围内: " + index); + + // 验证序列号持续递增 + assertEquals(i, ringBuffer.getCursor(), + "序列号应为 " + i); + } + + // 验证特定序列号的索引 + assertEquals(0, (int) (BUFFER_SIZE & (BUFFER_SIZE - 1)), + "序列号 " + BUFFER_SIZE + " 的索引应为 0"); + assertEquals(1, (int) ((BUFFER_SIZE + 1) & (BUFFER_SIZE - 1)), + "序列号 " + (BUFFER_SIZE + 1) + " 的索引应为 1"); + assertEquals(0, (int) ((BUFFER_SIZE * 2) & (BUFFER_SIZE - 1)), + "序列号 " + (BUFFER_SIZE * 2) + " 的索引应为 0"); + + log.info("环形循环测试通过 - 总事件数: {}, 当前游标: {}", + totalEvents, ringBuffer.getCursor()); + } + + /** + * 测试事件对象的获取和设置 + *

+ * 验证点: + *

    + *
  • 能够正确获取事件对象
  • + *
  • 能够正确设置事件属性
  • + *
  • 事件对象不为 null
  • + *
+ *

+ * + * @see com.lmax.disruptor.RingBuffer#get(long) + */ + @Test + @DisplayName("测试事件对象获取和设置") + void testEventObjectGetAndSet() { + // 获取事件对象 + long sequence = ringBuffer.next(); + OrderEvent event = ringBuffer.get(sequence); + + // 验证事件对象不为 null + assertNotNull(event, "事件对象不应为 null"); + + // 设置事件属性 + event.setOrderId(123); + event.setAmount(456.78); + event.setOrderType("TEST"); + event.setTimestamp(System.nanoTime()); + + // 验证属性设置正确 + assertEquals(123, event.getOrderId(), "订单ID应为 123"); + assertEquals(456.78, event.getAmount(), 0.001, "金额应为 456.78"); + assertEquals("TEST", event.getOrderType(), "订单类型应为 TEST"); + + // 发布事件 + ringBuffer.publish(sequence); + + // 再次获取相同序列号的事件对象 + OrderEvent eventAgain = ringBuffer.get(sequence); + + // 验证是同一个对象引用 + assertSame(event, eventAgain, "相同序列号应返回同一个对象引用"); + + // 验证数据一致 + assertEquals(123, eventAgain.getOrderId(), "订单ID应为 123"); + assertEquals(456.78, eventAgain.getAmount(), 0.001, "金额应为 456.78"); + + log.info("事件对象获取和设置测试通过"); + } + + /** + * 测试批量获取序列号 + *

+ * 验证点: + *

    + *
  • 能够批量获取连续的序列号
  • + *
  • 序列号连续且正确
  • + *
  • 批量发布功能正常
  • + *
+ *

+ * + * @see com.lmax.disruptor.RingBuffer#next(int) + * @see com.lmax.disruptor.RingBuffer#publish(long, int) + */ + @Test + @DisplayName("测试批量获取序列号") + void testBatchSequenceAllocation() { + int batchSize = 5; + + // 批量获取序列号 + long startSequence = ringBuffer.next(batchSize); + + // 验证起始序列号(由于之前测试可能已经使用了序列号,这里只验证不为负) + assertTrue(startSequence >= 0, "起始序列号不应为负"); + + // 填充事件 + for (int i = 0; i < batchSize; i++) { + OrderEvent event = ringBuffer.get(startSequence + i); + event.setOrderId(i + 1); + event.setAmount((i + 1) * 100.0); + } + + // 批量发布 + ringBuffer.publish(startSequence, batchSize); + + // 验证游标(由于之前测试可能已经发布过事件,这里只验证游标递增) + long cursorBeforeBatch = startSequence - 1; + assertTrue(ringBuffer.getCursor() >= cursorBeforeBatch, + "游标应该大于等于批次前的游标"); + + // 再次批量获取 + long nextStartSequence = ringBuffer.next(batchSize); + assertTrue(nextStartSequence >= startSequence + batchSize, + "下一次批量获取的起始序列号应该大于等于 " + (startSequence + batchSize)); + + log.info("批量序列号分配测试通过 - 批次大小: {}", batchSize); + } + + /** + * 测试缓冲区大小必须是2的幂次方 + *

+ * 验证点: + *

    + *
  • 能够正确判断一个数是否为2的幂次方
  • + *
  • 位运算判断方法正确
  • + *
+ *

+ */ + @Test + @DisplayName("测试2的幂次方判断") + void testPowerOfTwo() { + // 测试2的幂次方 + int[] powersOfTwo = {1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024}; + for (int n : powersOfTwo) { + assertTrue(isPowerOfTwo(n), n + " 应该是2的幂次方"); + } + + // 测试非2的幂次方 + int[] notPowersOfTwo = {0, 3, 5, 6, 7, 9, 10, 15, 17, 100, 1000}; + for (int n : notPowersOfTwo) { + assertFalse(isPowerOfTwo(n), n + " 不应该是2的幂次方"); + } + + log.info("2的幂次方判断测试通过"); + } + + /** + * 测试序列号的边界情况 + *

+ * 验证点: + *

    + *
  • 序列号0的处理
  • + *
  • 序列号Long.MAX_VALUE附近的处理
  • + *
  • 序列号溢出后的处理
  • + *
+ *

+ */ + @Test + @DisplayName("测试序列号边界情况") + void testSequenceBoundaryConditions() { + // 测试序列号0 + long seq0 = ringBuffer.next(); + assertEquals(0, seq0, "第一个序列号应为 0"); + ringBuffer.publish(seq0); + + // 测试序列号1 + long seq1 = ringBuffer.next(); + assertEquals(1, seq1, "第二个序列号应为 1"); + ringBuffer.publish(seq1); + + // 测试接近Long.MAX_VALUE的序列号 + // 注意:实际应用中不会达到这个值,但测试边界情况很重要 + long largeSequence = Long.MAX_VALUE - 10; + int index = (int) (largeSequence & (BUFFER_SIZE - 1)); + assertTrue(index >= 0 && index < BUFFER_SIZE, + "大序列号的索引应在有效范围内"); + + log.info("序列号边界情况测试通过"); + } + + /** + * 测试事件对象的预分配 + *

+ * RingBuffer 在初始化时会预分配所有事件对象。 + *

+ * + *

验证点:

+ *
    + *
  • 所有位置的事件对象都已预分配
  • + *
  • 事件对象不为 null
  • + *
  • 事件对象可以正常访问
  • + *
+ * + * @see com.lmax.disruptor.EventFactory + */ + @Test + @DisplayName("测试事件对象预分配") + void testEventPreallocation() { + // 访问所有位置的事件对象 + for (int i = 0; i < BUFFER_SIZE; i++) { + OrderEvent event = ringBuffer.get(i); + assertNotNull(event, "位置 " + i + " 的事件对象不应为 null"); + } + + log.info("事件对象预分配测试通过 - 所有位置的对象都已预分配"); + } + + /** + * 测试序列号的连续性 + *

+ * 验证点: + *

    + *
  • next() 返回的序列号连续递增
  • + *
  • 没有序列号重复或跳跃
  • + *
+ *

+ */ + @Test + @DisplayName("测试序列号连续性") + void testSequenceContinuity() { + long expectedSequence = 0; + int iterations = 100; + + for (int i = 0; i < iterations; i++) { + long sequence = ringBuffer.next(); + assertEquals(expectedSequence, sequence, + "序列号应为 " + expectedSequence + ",实际为 " + sequence); + ringBuffer.publish(sequence); + expectedSequence++; + } + + assertEquals(iterations, expectedSequence, + "最终序列号应为 " + iterations); + + log.info("序列号连续性测试通过 - 迭代次数: {}", iterations); + } + + /** + * 辅助方法:判断一个数是否为2的幂次方 + *

+ * 使用位运算:n & (n-1) == 0 + *

+ * + * @param n 要判断的数字 + * @return 如果是2的幂次方返回 true,否则返回 false + */ + private boolean isPowerOfTwo(int n) { + return n > 0 && (n & (n - 1)) == 0; + } +} diff --git a/src/test/java/com/example/disruptor/WaitStrategyPerformanceTest.java b/src/test/java/com/example/disruptor/WaitStrategyPerformanceTest.java new file mode 100644 index 0000000..499fedd --- /dev/null +++ b/src/test/java/com/example/disruptor/WaitStrategyPerformanceTest.java @@ -0,0 +1,330 @@ +package com.example.disruptor; + +import com.example.disruptor.event.OrderEvent; +import com.example.disruptor.event.OrderEventFactory; +import com.lmax.disruptor.*; +import com.lmax.disruptor.dsl.ProducerType; +import org.junit.jupiter.api.Assertions; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +/** + * 等待策略性能对比测试 + *

+ * 本测试类对比不同等待策略的性能特征,包括: + *

    + *
  • BlockingWaitStrategy - 使用锁和条件变量,CPU使用低,延迟高
  • + *
  • SleepingWaitStrategy - 自旋后睡眠,平衡性能和CPU使用
  • + *
  • YieldingWaitStrategy - 自旋后让出CPU,低延迟,高CPU使用
  • + *
  • BusySpinWaitStrategy - 持续自旋,最低延迟,极高CPU使用
  • + *
  • PhasedBackoffWaitStrategy - 自适应等待策略
  • + *
+ *

+ * + *

性能指标:

+ *
    + *
  • 吞吐量(Throughput) - 每秒处理的事件数量
  • + *
  • 延迟(Latency) - 事件从发布到消费的时间
  • + *
  • CPU使用率 - 等待期间CPU的使用情况
  • + *
+ * + * @see com.lmax.disruptor.WaitStrategy + * @see com.lmax.disruptor.BlockingWaitStrategy + * @see com.lmax.disruptor.SleepingWaitStrategy + * @see com.lmax.disruptor.YieldingWaitStrategy + * @see com.lmax.disruptor.BusySpinWaitStrategy + * @see com.lmax.disruptor.PhasedBackoffWaitStrategy + */ +@Slf4j +@DisplayName("等待策略性能对比测试") +class WaitStrategyPerformanceTest { + + /** + * 缓冲区大小 + */ + private static final int BUFFER_SIZE = 1024; + + /** + * 测试事件数量 + */ + private static final int EVENT_COUNT = 100_000; + + /** + * 线程工厂 + */ + private ThreadFactory threadFactory; + + /** + * 线程池 + */ + private ExecutorService executorService; + + /** + * 测试前初始化 + */ + @BeforeEach + void setUp() { + AtomicInteger threadNumber = new AtomicInteger(1); + threadFactory = r -> { + Thread thread = new Thread(r); + thread.setName("perf-test-" + threadNumber.getAndIncrement()); + thread.setDaemon(true); + return thread; + }; + executorService = Executors.newCachedThreadPool(threadFactory); + log.info("========== 性能测试开始 =========="); + } + + /** + * 测试后清理 + */ + @AfterEach + void tearDown() { + if (executorService != null && !executorService.isShutdown()) { + executorService.shutdown(); + } + log.info("========== 性能测试结束 ==========\n"); + } + + /** + * 测试 BlockingWaitStrategy + *

+ * 特点: + *

    + *
  • 使用锁和条件变量
  • + *
  • CPU使用率最低
  • + *
  • 延迟较高
  • + *
  • 适用于对延迟不敏感的场景
  • + *
+ *

+ * + * @see com.lmax.disruptor.BlockingWaitStrategy + */ + @Test + @DisplayName("测试 BlockingWaitStrategy 性能") + void testBlockingWaitStrategyPerformance() throws InterruptedException { + runPerformanceTest("BlockingWaitStrategy", new BlockingWaitStrategy()); + } + + /** + * 测试 SleepingWaitStrategy + *

+ * 特点: + *

    + *
  • 先自旋100次
  • + *
  • 然后调用 Thread.yield()
  • + *
  • 最后使用 LockSupport.parkNanos()
  • + *
  • 平衡性能和CPU使用
  • + *
  • 适用于大多数场景
  • + *
+ *

+ * + * @see com.lmax.disruptor.SleepingWaitStrategy + */ + @Test + @DisplayName("测试 SleepingWaitStrategy 性能") + void testSleepingWaitStrategyPerformance() throws InterruptedException { + runPerformanceTest("SleepingWaitStrategy", new SleepingWaitStrategy()); + } + + /** + * 测试 YieldingWaitStrategy + *

+ * 特点: + *

    + *
  • 先自旋100次
  • + *
  • 然后调用 Thread.yield()
  • + *
  • 低延迟
  • + *
  • CPU使用率高
  • + *
  • 适用于低延迟场景
  • + *
+ *

+ * + * @see com.lmax.disruptor.YieldingWaitStrategy + */ + @Test + @DisplayName("测试 YieldingWaitStrategy 性能") + void testYieldingWaitStrategyPerformance() throws InterruptedException { + runPerformanceTest("YieldingWaitStrategy", new YieldingWaitStrategy()); + } + + /** + * 测试 BusySpinWaitStrategy + *

+ * 特点: + *

    + *
  • 持续自旋等待
  • + *
  • 最低延迟
  • + *
  • 极高CPU使用率
  • + *
  • 仅适用于超低延迟场景
  • + *
+ *

+ * + * @see com.lmax.disruptor.BusySpinWaitStrategy + */ + @Test + @DisplayName("测试 BusySpinWaitStrategy 性能") + void testBusySpinWaitStrategyPerformance() throws InterruptedException { + runPerformanceTest("BusySpinWaitStrategy", new BusySpinWaitStrategy()); + } + + /** + * 测试 PhasedBackoffWaitStrategy + *

+ * 特点: + *

    + *
  • 自适应等待策略
  • + *
  • 在自旋和睡眠之间自动切换
  • + *
  • 适用于负载不稳定的场景
  • + *
+ *

+ * + * @see com.lmax.disruptor.PhasedBackoffWaitStrategy + */ + @Test + @DisplayName("测试 PhasedBackoffWaitStrategy 性能") + void testPhasedBackoffWaitStrategyPerformance() throws InterruptedException { + WaitStrategy strategy = new PhasedBackoffWaitStrategy( + 1, 1000, TimeUnit.NANOSECONDS, + new SleepingWaitStrategy() + ); + runPerformanceTest("PhasedBackoffWaitStrategy", strategy); + } + + /** + * 运行性能测试 + *

+ * 测试流程: + *

    + *
  1. 创建 Disruptor 实例
  2. + *
  3. 注册消费者
  4. + *
  5. 发布指定数量的事件
  6. + *
  7. 等待所有事件被处理
  8. + *
  9. 计算性能指标
  10. + *
+ *

+ * + * @param strategyName 策略名称 + * @param strategy 等待策略 + */ + private void runPerformanceTest(String strategyName, WaitStrategy strategy) throws InterruptedException { + log.info("开始测试 {} 性能", strategyName); + + AtomicInteger processedCount = new AtomicInteger(0); + CountDownLatch latch = new CountDownLatch(EVENT_COUNT); + AtomicLong totalLatency = new AtomicLong(0); + + // 创建 Disruptor + Disruptor disruptor = new Disruptor<>( + new OrderEventFactory(), + BUFFER_SIZE, + threadFactory, + ProducerType.SINGLE, + strategy + ); + + // 注册消费者 + disruptor.handleEventsWith((event, sequence, endOfBatch) -> { + // 计算延迟 + long latency = System.nanoTime() - event.getTimestamp(); + totalLatency.addAndGet(latency); + + processedCount.incrementAndGet(); + latch.countDown(); + }); + + // 启动 Disruptor + RingBuffer ringBuffer = disruptor.start(); + + // 开始计时 + long startTime = System.nanoTime(); + + // 发布事件 + for (int i = 0; i < EVENT_COUNT; i++) { + long sequence = ringBuffer.next(); + try { + OrderEvent event = ringBuffer.get(sequence); + event.setOrderId(i); + event.setAmount(i * 100.0); + event.setOrderType("PERF_TEST"); + event.setTimestamp(System.nanoTime()); + } finally { + ringBuffer.publish(sequence); + } + } + + // 等待所有事件被处理 + latch.await(); + + // 结束计时 + long endTime = System.nanoTime(); + long durationNanos = endTime - startTime; + double durationSeconds = durationNanos / 1_000_000_000.0; + + // 计算性能指标 + long throughput = (long) (EVENT_COUNT / durationSeconds); + double avgLatencyMicros = (totalLatency.get() / (double) EVENT_COUNT) / 1000.0; + + // 输出结果 + log.info("========== {} 性能测试结果 ==========", strategyName); + log.info("事件数量: {}", EVENT_COUNT); + log.info("总耗时: {:.3f} 秒", durationSeconds); + log.info("吞吐量: {} 事件/秒", throughput); + log.info("平均延迟: {:.3f} 微秒", avgLatencyMicros); + log.info("=========================================="); + + // 验证所有事件都被处理 + Assertions.assertEquals(EVENT_COUNT, processedCount.get(), + strategyName + " 应该处理所有事件"); + + disruptor.shutdown(); + } + + /** + * 对比所有等待策略的性能 + *

+ * 本测试一次性运行所有等待策略,便于对比性能差异。 + *

+ */ + @Test + @DisplayName("对比所有等待策略性能") + void testCompareAllWaitStrategies() throws InterruptedException { + log.info("========== 开始对比所有等待策略性能 =========="); + + WaitStrategy[] strategies = { + new BlockingWaitStrategy(), + new SleepingWaitStrategy(), + new YieldingWaitStrategy(), + new BusySpinWaitStrategy(), + new PhasedBackoffWaitStrategy(1, 1000, TimeUnit.NANOSECONDS, new SleepingWaitStrategy()) + }; + + String[] strategyNames = { + "BlockingWaitStrategy", + "SleepingWaitStrategy", + "YieldingWaitStrategy", + "BusySpinWaitStrategy", + "PhasedBackoffWaitStrategy" + }; + + for (int i = 0; i < strategies.length; i++) { + runPerformanceTest(strategyNames[i], strategies[i]); + log.info(""); + } + + log.info("========== 所有等待策略性能对比完成 =========="); + } +}