7 Star 6 Fork 24

src-openEuler/kafka
关闭

加入 Gitee
与超过 1400万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
0007-fix-payload-incorrectly.patch 2.08 KB
一键复制 编辑 原始数据 按行查看 历史
diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
index 12a0fccea8..3c5f63df18 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
@@ -112,12 +112,10 @@ public class ProducerPerformance {
/* setup perf test */
byte[] payload = null;
- Random random = new Random(0);
if (recordSize != null) {
payload = new byte[recordSize];
- for (int i = 0; i < payload.length; ++i)
- payload[i] = (byte) (random.nextInt(26) + 65);
}
+ Random random = new Random(0);
ProducerRecord<byte[], byte[]> record;
Stats stats = new Stats(numRecords, 5000);
long startMs = System.currentTimeMillis();
@@ -127,15 +125,20 @@ public class ProducerPerformance {
int currentTransactionSize = 0;
long transactionStartTime = 0;
for (long i = 0; i < numRecords; i++) {
+ if (payloadFilePath != null) {
+ payload = payloadByteList.get(random.nextInt(payloadByteList.size()));
+ } else if (recordSize != null) {
+ for (int j = 0; j < payload.length; ++j)
+ payload[j] = (byte) (random.nextInt(26) + 65);
+ } else {
+ throw new IllegalArgumentException("no payload File Path or record Size provided");
+ }
+
if (transactionsEnabled && currentTransactionSize == 0) {
producer.beginTransaction();
transactionStartTime = System.currentTimeMillis();
}
-
- if (payloadFilePath != null) {
- payload = payloadByteList.get(random.nextInt(payloadByteList.size()));
- }
record = new ProducerRecord<>(topicName, payload);
long sendStartMs = System.currentTimeMillis();
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/src-openeuler/kafka.git
git@gitee.com:src-openeuler/kafka.git
src-openeuler
kafka
kafka
master

搜索帮助