From 3c976d4856a75ffa38a59e4c98cb101a5486264c Mon Sep 17 00:00:00 2001 From: User Date: Fri, 8 Jun 2018 11:42:48 +0800 Subject: [PATCH 1/6] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E9=87=8D=E6=9E=84?= =?UTF-8?q?=EF=BC=8C=E5=BB=B6=E6=97=B6=E8=B5=B0timer=E6=96=B9=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../GenerateCompleteRequestHandler.java | 35 ++++++++----------- .../pbft/listener/PrepareEventListener.java | 13 +++---- .../pbft/queue/AbstractVoteMsgQueue.java | 11 +++--- .../socket/pbft/queue/PreMsgQueue.java | 35 +++++++++++-------- .../socket/pbft/queue/PrepareMsgQueue.java | 2 +- 5 files changed, 45 insertions(+), 51 deletions(-) diff --git a/src/main/java/com/mindata/blockchain/socket/handler/server/GenerateCompleteRequestHandler.java b/src/main/java/com/mindata/blockchain/socket/handler/server/GenerateCompleteRequestHandler.java index ee482ec..36da8f2 100644 --- a/src/main/java/com/mindata/blockchain/socket/handler/server/GenerateCompleteRequestHandler.java +++ b/src/main/java/com/mindata/blockchain/socket/handler/server/GenerateCompleteRequestHandler.java @@ -1,18 +1,18 @@ package com.mindata.blockchain.socket.handler.server; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.tio.core.ChannelContext; + import com.mindata.blockchain.ApplicationContextProvider; import com.mindata.blockchain.block.Block; +import com.mindata.blockchain.common.TimerManager; import com.mindata.blockchain.core.manager.DbBlockManager; import com.mindata.blockchain.socket.base.AbstractBlockHandler; import com.mindata.blockchain.socket.body.RpcSimpleBlockBody; import com.mindata.blockchain.socket.client.PacketSender; import com.mindata.blockchain.socket.packet.BlockPacket; import com.mindata.blockchain.socket.packet.NextBlockPacketBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.tio.core.ChannelContext; - -import java.util.concurrent.CompletableFuture; /** * 已生成了新区块的全网广播 @@ -33,23 +33,18 @@ public class GenerateCompleteRequestHandler extends AbstractBlockHandler { - try { - Thread.sleep(2000); - Block block = ApplicationContextProvider.getBean(DbBlockManager.class).getBlockByHash(rpcBlockBody - .getHash()); - //本地有了 - if (block == null) { - logger.info("开始去获取别人的新区块"); - //在这里发请求,去获取group别人的新区块 - BlockPacket nextBlockPacket = NextBlockPacketBuilder.build(); - ApplicationContextProvider.getBean(PacketSender.class).sendGroup(nextBlockPacket); - } - } catch (InterruptedException e) { - e.printStackTrace(); + TimerManager.schedule(() -> { + Block block = ApplicationContextProvider.getBean(DbBlockManager.class).getBlockByHash(rpcBlockBody + .getHash()); + //本地有了 + if (block == null) { + logger.info("开始去获取别人的新区块"); + //在这里发请求,去获取group别人的新区块 + BlockPacket nextBlockPacket = NextBlockPacketBuilder.build(); + ApplicationContextProvider.getBean(PacketSender.class).sendGroup(nextBlockPacket); } return null; - }); + },2000); return null; } diff --git a/src/main/java/com/mindata/blockchain/socket/pbft/listener/PrepareEventListener.java b/src/main/java/com/mindata/blockchain/socket/pbft/listener/PrepareEventListener.java index 11da0af..59ff448 100644 --- a/src/main/java/com/mindata/blockchain/socket/pbft/listener/PrepareEventListener.java +++ b/src/main/java/com/mindata/blockchain/socket/pbft/listener/PrepareEventListener.java @@ -1,18 +1,17 @@ package com.mindata.blockchain.socket.pbft.listener; -import com.mindata.blockchain.common.AppId; +import javax.annotation.Resource; + +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Component; + import com.mindata.blockchain.socket.body.VoteBody; import com.mindata.blockchain.socket.client.PacketSender; import com.mindata.blockchain.socket.packet.BlockPacket; import com.mindata.blockchain.socket.packet.PacketBuilder; import com.mindata.blockchain.socket.packet.PacketType; -import com.mindata.blockchain.socket.pbft.VoteType; import com.mindata.blockchain.socket.pbft.event.MsgPrepareEvent; import com.mindata.blockchain.socket.pbft.msg.VoteMsg; -import org.springframework.context.event.EventListener; -import org.springframework.stereotype.Component; - -import javax.annotation.Resource; /** * @author wuweifeng wrote on 2018/4/25. @@ -31,8 +30,6 @@ public class PrepareEventListener { @EventListener public void msgIsPrepare(MsgPrepareEvent msgPrepareEvent) { VoteMsg voteMsg = (VoteMsg) msgPrepareEvent.getSource(); - voteMsg.setVoteType(VoteType.PREPARE); - voteMsg.setAppId(AppId.value); //群发消息,通知别的节点,我已对该Block Prepare BlockPacket blockPacket = new PacketBuilder<>().setType(PacketType.PBFT_VOTE).setBody(new diff --git a/src/main/java/com/mindata/blockchain/socket/pbft/queue/AbstractVoteMsgQueue.java b/src/main/java/com/mindata/blockchain/socket/pbft/queue/AbstractVoteMsgQueue.java index 85ecdea..0959f20 100644 --- a/src/main/java/com/mindata/blockchain/socket/pbft/queue/AbstractVoteMsgQueue.java +++ b/src/main/java/com/mindata/blockchain/socket/pbft/queue/AbstractVoteMsgQueue.java @@ -1,6 +1,8 @@ package com.mindata.blockchain.socket.pbft.queue; import cn.hutool.core.collection.CollectionUtil; + +import com.mindata.blockchain.common.TimerManager; import com.mindata.blockchain.socket.pbft.msg.VoteMsg; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,12 +86,7 @@ public abstract class AbstractVoteMsgQueue extends BaseMsgQueue { * 清理旧的block的hash */ protected void clearOldBlockHash(int number) { - CompletableFuture.supplyAsync(() -> { - try { - Thread.sleep(2000); - } catch (InterruptedException e) { - e.printStackTrace(); - } + TimerManager.schedule(() -> { for (String key : voteMsgConcurrentHashMap.keySet()) { if (voteMsgConcurrentHashMap.get(key).get(0).getNumber() <= number) { voteMsgConcurrentHashMap.remove(key); @@ -97,6 +94,6 @@ public abstract class AbstractVoteMsgQueue extends BaseMsgQueue { } } return null; - }); + },2000); } } diff --git a/src/main/java/com/mindata/blockchain/socket/pbft/queue/PreMsgQueue.java b/src/main/java/com/mindata/blockchain/socket/pbft/queue/PreMsgQueue.java index fcd4da2..262c9f6 100644 --- a/src/main/java/com/mindata/blockchain/socket/pbft/queue/PreMsgQueue.java +++ b/src/main/java/com/mindata/blockchain/socket/pbft/queue/PreMsgQueue.java @@ -1,10 +1,9 @@ package com.mindata.blockchain.socket.pbft.queue; -import com.mindata.blockchain.block.Block; -import com.mindata.blockchain.core.event.AddBlockEvent; -import com.mindata.blockchain.socket.pbft.event.MsgPrepareEvent; -import com.mindata.blockchain.socket.pbft.msg.VoteMsg; -import com.mindata.blockchain.socket.pbft.msg.VotePreMsg; +import java.util.concurrent.ConcurrentHashMap; + +import javax.annotation.Resource; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationEventPublisher; @@ -12,9 +11,16 @@ import org.springframework.context.event.EventListener; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; -import javax.annotation.Resource; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; +import com.mindata.blockchain.block.Block; +import com.mindata.blockchain.common.AppId; +import com.mindata.blockchain.common.TimerManager; +import com.mindata.blockchain.core.event.AddBlockEvent; +import com.mindata.blockchain.socket.pbft.VoteType; +import com.mindata.blockchain.socket.pbft.event.MsgPrepareEvent; +import com.mindata.blockchain.socket.pbft.msg.VoteMsg; +import com.mindata.blockchain.socket.pbft.msg.VotePreMsg; + +import cn.hutool.core.bean.BeanUtil; /** * preprepare消息的存储,但凡收到请求生成Block的信息,都在这里处理 @@ -51,6 +57,10 @@ public class PreMsgQueue extends BaseMsgQueue { blockConcurrentHashMap.put(hash, votePreMsg); //加入Prepare行列,推送给所有人 + VoteMsg prepareMsg = new VoteMsg(); + BeanUtil.copyProperties(voteMsg, prepareMsg); + prepareMsg.setVoteType(VoteType.PREPARE); + prepareMsg.setAppId(AppId.value); eventPublisher.publishEvent(new MsgPrepareEvent(voteMsg)); } @@ -77,18 +87,13 @@ public class PreMsgQueue extends BaseMsgQueue { public void blockGenerated(AddBlockEvent addBlockEvent) { Block block = (Block) addBlockEvent.getSource(); int number = block.getBlockHeader().getNumber(); - CompletableFuture.supplyAsync(() -> { - try { - Thread.sleep(2000); - } catch (InterruptedException e) { - e.printStackTrace(); - } + TimerManager.schedule(() -> { for (String key : blockConcurrentHashMap.keySet()) { if (blockConcurrentHashMap.get(key).getNumber() <= number) { blockConcurrentHashMap.remove(key); } } return null; - }); + },2000); } } diff --git a/src/main/java/com/mindata/blockchain/socket/pbft/queue/PrepareMsgQueue.java b/src/main/java/com/mindata/blockchain/socket/pbft/queue/PrepareMsgQueue.java index 1f2fb39..296d479 100644 --- a/src/main/java/com/mindata/blockchain/socket/pbft/queue/PrepareMsgQueue.java +++ b/src/main/java/com/mindata/blockchain/socket/pbft/queue/PrepareMsgQueue.java @@ -71,8 +71,8 @@ public class PrepareMsgQueue extends AbstractVoteMsgQueue { logger.info("Prepare阶段完毕,是否进入commit的标志是:" + flag); //发出拒绝commit的消息 commitMsg.setAgree(flag); - eventPublisher.publishEvent(new MsgCommitEvent(commitMsg)); voteStateConcurrentHashMap.put(commitMsg.getHash(), flag); + eventPublisher.publishEvent(new MsgCommitEvent(commitMsg)); } /** -- Gitee From 87971fe34aa8f00bb6dbe1b9d8723877a30fced4 Mon Sep 17 00:00:00 2001 From: andylo25 Date: Mon, 11 Jun 2018 09:56:41 +0800 Subject: [PATCH 2/6] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E9=87=8D=E6=9E=84?= =?UTF-8?q?=EF=BC=8C=E5=BB=B6=E6=97=B6=E6=89=A7=E8=A1=8C=E8=B5=B0timer?= =?UTF-8?q?=E6=96=B9=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../blockchain/common/timer/TimerManager.java | 46 +++++++++++++++++++ .../GenerateCompleteRequestHandler.java | 2 +- .../pbft/queue/AbstractVoteMsgQueue.java | 14 +++--- .../socket/pbft/queue/PreMsgQueue.java | 2 +- 4 files changed, 55 insertions(+), 9 deletions(-) create mode 100644 src/main/java/com/mindata/blockchain/common/timer/TimerManager.java diff --git a/src/main/java/com/mindata/blockchain/common/timer/TimerManager.java b/src/main/java/com/mindata/blockchain/common/timer/TimerManager.java new file mode 100644 index 0000000..914ad6b --- /dev/null +++ b/src/main/java/com/mindata/blockchain/common/timer/TimerManager.java @@ -0,0 +1,46 @@ +package com.mindata.blockchain.common.timer; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +/** + * 异步延时执行 + * 需要调用延迟执行的时候可使用,避免在代码里面直接调用Thread.sleep()方法 + * + * @author andylo25 wrote on 2018/6/11. + */ +public class TimerManager { + + private volatile static ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3); + + public static void schedule(Supplier action, long delay){ + executorService.schedule(new Runnable() { + @Override + public void run() { + action.get(); + } + }, delay, TimeUnit.MILLISECONDS); + } + + public static void scheduleAtFixedRate(Supplier action,long initialDelay, long period ){ + executorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + action.get(); + } + }, initialDelay,period, TimeUnit.MILLISECONDS); + } + + public static void scheduleWithFixedDelay(Supplier action,long initialDelay, long period ){ + executorService.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + action.get(); + } + }, initialDelay,period, TimeUnit.MILLISECONDS); + } + + +} diff --git a/src/main/java/com/mindata/blockchain/socket/handler/server/GenerateCompleteRequestHandler.java b/src/main/java/com/mindata/blockchain/socket/handler/server/GenerateCompleteRequestHandler.java index 36da8f2..576e2a3 100644 --- a/src/main/java/com/mindata/blockchain/socket/handler/server/GenerateCompleteRequestHandler.java +++ b/src/main/java/com/mindata/blockchain/socket/handler/server/GenerateCompleteRequestHandler.java @@ -6,7 +6,7 @@ import org.tio.core.ChannelContext; import com.mindata.blockchain.ApplicationContextProvider; import com.mindata.blockchain.block.Block; -import com.mindata.blockchain.common.TimerManager; +import com.mindata.blockchain.common.timer.TimerManager; import com.mindata.blockchain.core.manager.DbBlockManager; import com.mindata.blockchain.socket.base.AbstractBlockHandler; import com.mindata.blockchain.socket.body.RpcSimpleBlockBody; diff --git a/src/main/java/com/mindata/blockchain/socket/pbft/queue/AbstractVoteMsgQueue.java b/src/main/java/com/mindata/blockchain/socket/pbft/queue/AbstractVoteMsgQueue.java index 0959f20..8e53d00 100644 --- a/src/main/java/com/mindata/blockchain/socket/pbft/queue/AbstractVoteMsgQueue.java +++ b/src/main/java/com/mindata/blockchain/socket/pbft/queue/AbstractVoteMsgQueue.java @@ -1,16 +1,16 @@ package com.mindata.blockchain.socket.pbft.queue; -import cn.hutool.core.collection.CollectionUtil; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; -import com.mindata.blockchain.common.TimerManager; -import com.mindata.blockchain.socket.pbft.msg.VoteMsg; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; +import com.mindata.blockchain.common.timer.TimerManager; +import com.mindata.blockchain.socket.pbft.msg.VoteMsg; + +import cn.hutool.core.collection.CollectionUtil; /** * @author wuweifeng wrote on 2018/4/26. diff --git a/src/main/java/com/mindata/blockchain/socket/pbft/queue/PreMsgQueue.java b/src/main/java/com/mindata/blockchain/socket/pbft/queue/PreMsgQueue.java index 262c9f6..bce3233 100644 --- a/src/main/java/com/mindata/blockchain/socket/pbft/queue/PreMsgQueue.java +++ b/src/main/java/com/mindata/blockchain/socket/pbft/queue/PreMsgQueue.java @@ -13,7 +13,7 @@ import org.springframework.stereotype.Component; import com.mindata.blockchain.block.Block; import com.mindata.blockchain.common.AppId; -import com.mindata.blockchain.common.TimerManager; +import com.mindata.blockchain.common.timer.TimerManager; import com.mindata.blockchain.core.event.AddBlockEvent; import com.mindata.blockchain.socket.pbft.VoteType; import com.mindata.blockchain.socket.pbft.event.MsgPrepareEvent; -- Gitee From 4b6a7c166372a5c8294094111d1c8c054e55246d Mon Sep 17 00:00:00 2001 From: andy Date: Mon, 11 Jun 2018 23:18:45 +0800 Subject: [PATCH 3/6] =?UTF-8?q?=E4=BF=AE=E6=94=B9maven=E5=BA=93=EF=BC=8C?= =?UTF-8?q?=E4=BC=98=E5=85=88=E4=BD=BF=E7=94=A8aliyun?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/pom.xml b/pom.xml index 87bda85..bb0ae3f 100644 --- a/pom.xml +++ b/pom.xml @@ -24,6 +24,19 @@ 1.8 1.2.29 + + + + ali-repos + ali Repository + http://maven.aliyun.com/nexus/content/groups/public + + + center-repos + center Repository + http://repo1.maven.org/maven2 + + -- Gitee From 1cb00abfa210a4649bea076b616e6537a834a666 Mon Sep 17 00:00:00 2001 From: andylo25 Date: Tue, 12 Jun 2018 16:04:20 +0800 Subject: [PATCH 4/6] =?UTF-8?q?1=EF=BC=8C=E4=BB=A3=E7=A0=81=E9=87=8D?= =?UTF-8?q?=E6=9E=84=E3=80=822=EF=BC=8C=E5=A2=9E=E5=8A=A0=E6=89=A7?= =?UTF-8?q?=E8=A1=8Csql=E8=84=9A=E6=9C=AC=E7=9A=84=E9=A2=84=E6=A0=A1?= =?UTF-8?q?=E9=AA=8C=E3=80=823=EF=BC=8C=E4=BF=AE=E5=A4=8DDisruptor?= =?UTF-8?q?=E6=B6=88=E8=B4=B9=E6=89=A7=E8=A1=8C=E5=BC=82=E5=B8=B8=E5=90=8E?= =?UTF-8?q?=E7=BA=BF=E7=A8=8B=E6=8C=82=E6=8E=89=EF=BC=8C4=EF=BC=8C?= =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=98=AF=E5=90=A6=E5=8D=95=E8=8A=82=E7=82=B9?= =?UTF-8?q?=E6=B5=8B=E8=AF=95=E9=85=8D=E7=BD=AE=EF=BC=9AsingeNode,5,?= =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E3=80=90=E6=9B=B4=E6=96=B0=E3=80=91=E3=80=90?= =?UTF-8?q?=E5=88=A0=E9=99=A4=E3=80=91test?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../core/controller/BlockController.java | 61 ++++++++++++++++++- .../core/service/InstructionService.java | 3 +- .../blockchain/core/sqlite/SqliteManager.java | 15 ++++- .../socket/client/ClientStarter.java | 3 + .../distruptor/DisruptorServerHandler.java | 12 +++- .../pbft/queue/AbstractVoteMsgQueue.java | 2 +- .../socket/pbft/queue/CommitMsgQueue.java | 19 +++--- .../socket/pbft/queue/PreMsgQueue.java | 17 +++++- .../socket/pbft/queue/PrepareMsgQueue.java | 5 -- 9 files changed, 112 insertions(+), 25 deletions(-) diff --git a/src/main/java/com/mindata/blockchain/core/controller/BlockController.java b/src/main/java/com/mindata/blockchain/core/controller/BlockController.java index 0b759ea..eed6922 100644 --- a/src/main/java/com/mindata/blockchain/core/controller/BlockController.java +++ b/src/main/java/com/mindata/blockchain/core/controller/BlockController.java @@ -2,6 +2,7 @@ package com.mindata.blockchain.core.controller; import javax.annotation.Resource; +import org.apache.commons.lang3.StringUtils; import org.springframework.data.domain.Pageable; import org.springframework.data.web.PageableDefault; import org.springframework.web.bind.annotation.GetMapping; @@ -72,7 +73,7 @@ public class BlockController { } /** - * 测试生成一个Block,公钥私钥可以通过PairKeyController来生成 + * 测试生成一个insert:Block,公钥私钥可以通过PairKeyController来生成 * @param content * sql内容 */ @@ -87,14 +88,68 @@ public class BlockController { Instruction instruction = instructionService.build(instructionBody); BlockRequestBody blockRequestBody = new BlockRequestBody(); - blockRequestBody.setPublicKey("A8WLqHTjcT/FQ2IWhIePNShUEcdCzu5dG+XrQU8OMu54"); + blockRequestBody.setPublicKey(instructionBody.getPublicKey()); com.mindata.blockchain.block.BlockBody blockBody = new com.mindata.blockchain.block.BlockBody(); blockBody.setInstructions(CollectionUtil.newArrayList(instruction)); - blockRequestBody.setBlockBody(blockBody); + blockRequestBody.setBlockBody(blockBody); return ResultGenerator.genSuccessResult(blockService.addBlock(blockRequestBody)); } + + /** + * 测试生成一个update:Block,公钥私钥可以通过PairKeyController来生成 + * @param id 更新的主键 + * @param content + * sql内容 + */ + @GetMapping("testUpdate") + public BaseData testUpdate(String id,String content) throws Exception { + if(StringUtils.isBlank(id)) ResultGenerator.genSuccessResult("主键不可为空"); + InstructionBody instructionBody = new InstructionBody(); + instructionBody.setOperation(Operation.UPDATE); + instructionBody.setTable("message"); + instructionBody.setInstructionId(id); + instructionBody.setJson("{\"content\":\"" + content + "\"}"); + instructionBody.setPublicKey("A8WLqHTjcT/FQ2IWhIePNShUEcdCzu5dG+XrQU8OMu54"); + instructionBody.setPrivateKey("yScdp6fNgUU+cRUTygvJG4EBhDKmOMRrK4XJ9mKVQJ8="); + Instruction instruction = instructionService.build(instructionBody); + + BlockRequestBody blockRequestBody = new BlockRequestBody(); + blockRequestBody.setPublicKey(instructionBody.getPublicKey()); + com.mindata.blockchain.block.BlockBody blockBody = new com.mindata.blockchain.block.BlockBody(); + blockBody.setInstructions(CollectionUtil.newArrayList(instruction)); + + blockRequestBody.setBlockBody(blockBody); + + return ResultGenerator.genSuccessResult(blockService.addBlock(blockRequestBody)); + } + + /** + * 测试生成一个delete:Block,公钥私钥可以通过PairKeyController来生成 + * @param id 待删除记录的主键 + * sql内容 + */ + @GetMapping("testDel") + public BaseData testDel(String id) throws Exception { + if(StringUtils.isBlank(id)) ResultGenerator.genSuccessResult("主键不可为空"); + InstructionBody instructionBody = new InstructionBody(); + instructionBody.setOperation(Operation.DELETE); + instructionBody.setTable("message"); + instructionBody.setInstructionId(id); + instructionBody.setPublicKey("A8WLqHTjcT/FQ2IWhIePNShUEcdCzu5dG+XrQU8OMu54"); + instructionBody.setPrivateKey("yScdp6fNgUU+cRUTygvJG4EBhDKmOMRrK4XJ9mKVQJ8="); + Instruction instruction = instructionService.build(instructionBody); + + BlockRequestBody blockRequestBody = new BlockRequestBody(); + blockRequestBody.setPublicKey(instructionBody.getPublicKey()); + com.mindata.blockchain.block.BlockBody blockBody = new com.mindata.blockchain.block.BlockBody(); + blockBody.setInstructions(CollectionUtil.newArrayList(instruction)); + + blockRequestBody.setBlockBody(blockBody); + + return ResultGenerator.genSuccessResult(blockService.addBlock(blockRequestBody)); + } /** * 查询已落地的sqlite里的所有数据 diff --git a/src/main/java/com/mindata/blockchain/core/service/InstructionService.java b/src/main/java/com/mindata/blockchain/core/service/InstructionService.java index ee9c72e..ba2ff7b 100644 --- a/src/main/java/com/mindata/blockchain/core/service/InstructionService.java +++ b/src/main/java/com/mindata/blockchain/core/service/InstructionService.java @@ -71,8 +71,7 @@ public class InstructionService { private String getSignString(Instruction instruction) { return instruction.getOperation() + instruction.getTable() + instruction - .getInstructionId() + (instruction.getJson()==null?"":instruction.getJson()) - + (instruction.getOldJson()==null?"":instruction.getOldJson()); + .getInstructionId() + (instruction.getJson()==null?"":instruction.getJson()); } /** diff --git a/src/main/java/com/mindata/blockchain/core/sqlite/SqliteManager.java b/src/main/java/com/mindata/blockchain/core/sqlite/SqliteManager.java index 184cf52..ca69497 100644 --- a/src/main/java/com/mindata/blockchain/core/sqlite/SqliteManager.java +++ b/src/main/java/com/mindata/blockchain/core/sqlite/SqliteManager.java @@ -15,6 +15,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; import javax.annotation.Resource; import java.util.ArrayList; @@ -93,7 +94,7 @@ public class SqliteManager { * @param block * block */ - private void rollBack(Block block) { + public void rollBack(Block block) { List instructions = block.getBlockBody().getInstructions(); int size = instructions.size(); //需要对语句集合进行反转,然后执行和execute一样的操作 @@ -109,4 +110,16 @@ public class SqliteManager { instructionParser.parse(instruction); } } + + /** + * 测试block的代码是否能正确执行 + * + * @param block + * @throws Exception msg=00001 则说明是正常执行 + */ + @Transactional(rollbackFor = Exception.class) + public void tryExecute(Block block) throws Exception{ + execute(block); + throw new Exception("00001"); + } } diff --git a/src/main/java/com/mindata/blockchain/socket/client/ClientStarter.java b/src/main/java/com/mindata/blockchain/socket/client/ClientStarter.java index c3006be..b14fc56 100644 --- a/src/main/java/com/mindata/blockchain/socket/client/ClientStarter.java +++ b/src/main/java/com/mindata/blockchain/socket/client/ClientStarter.java @@ -54,6 +54,8 @@ public class ClientStarter { private String appId; @Value("${name}") private String name; + @Value("${singeNode}") + private Boolean singeNode; private Logger logger = LoggerFactory.getLogger(getClass()); @@ -209,6 +211,7 @@ public class ClientStarter { pbft = 1; } //如果要单节点测试,此处返回值改为0 + if(singeNode) return 0; return pbft; } diff --git a/src/main/java/com/mindata/blockchain/socket/distruptor/DisruptorServerHandler.java b/src/main/java/com/mindata/blockchain/socket/distruptor/DisruptorServerHandler.java index 3ab2e49..032c81d 100644 --- a/src/main/java/com/mindata/blockchain/socket/distruptor/DisruptorServerHandler.java +++ b/src/main/java/com/mindata/blockchain/socket/distruptor/DisruptorServerHandler.java @@ -1,16 +1,26 @@ package com.mindata.blockchain.socket.distruptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.lmax.disruptor.EventHandler; import com.mindata.blockchain.ApplicationContextProvider; import com.mindata.blockchain.socket.distruptor.base.BaseEvent; +import com.mindata.blockchain.socket.handler.server.PbftVoteHandler; /** * @author wuweifeng wrote on 2018/4/20. */ public class DisruptorServerHandler implements EventHandler { + + private Logger logger = LoggerFactory.getLogger(DisruptorServerHandler.class); @Override public void onEvent(BaseEvent baseEvent, long sequence, boolean endOfBatch) throws Exception { - ApplicationContextProvider.getBean(DisruptorServerConsumer.class).receive(baseEvent); + try { + ApplicationContextProvider.getBean(DisruptorServerConsumer.class).receive(baseEvent); + } catch (Exception e) { + logger.error("Disruptor事件执行异常",e); + } } } diff --git a/src/main/java/com/mindata/blockchain/socket/pbft/queue/AbstractVoteMsgQueue.java b/src/main/java/com/mindata/blockchain/socket/pbft/queue/AbstractVoteMsgQueue.java index 8e53d00..c3027ac 100644 --- a/src/main/java/com/mindata/blockchain/socket/pbft/queue/AbstractVoteMsgQueue.java +++ b/src/main/java/com/mindata/blockchain/socket/pbft/queue/AbstractVoteMsgQueue.java @@ -47,7 +47,7 @@ public abstract class AbstractVoteMsgQueue extends BaseMsgQueue { //添加进去 voteMsgs.add(voteMsg); - //如果我已经对该hash的commit投过票了,就不再继续 + //如果已经对该hash投过票了,就不再继续 if (voteStateConcurrentHashMap.get(hash) != null) { return; } diff --git a/src/main/java/com/mindata/blockchain/socket/pbft/queue/CommitMsgQueue.java b/src/main/java/com/mindata/blockchain/socket/pbft/queue/CommitMsgQueue.java index 11dea94..5e2a944 100644 --- a/src/main/java/com/mindata/blockchain/socket/pbft/queue/CommitMsgQueue.java +++ b/src/main/java/com/mindata/blockchain/socket/pbft/queue/CommitMsgQueue.java @@ -1,17 +1,19 @@ package com.mindata.blockchain.socket.pbft.queue; -import com.mindata.blockchain.ApplicationContextProvider; -import com.mindata.blockchain.block.Block; -import com.mindata.blockchain.core.event.AddBlockEvent; -import com.mindata.blockchain.socket.pbft.msg.VoteMsg; +import java.util.List; + +import javax.annotation.Resource; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.event.EventListener; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; -import javax.annotation.Resource; -import java.util.List; +import com.mindata.blockchain.ApplicationContextProvider; +import com.mindata.blockchain.block.Block; +import com.mindata.blockchain.core.event.AddBlockEvent; +import com.mindata.blockchain.socket.pbft.msg.VoteMsg; /** * Confirm阶段的消息队列 @@ -30,11 +32,6 @@ public class CommitMsgQueue extends AbstractVoteMsgQueue { protected void deal(VoteMsg voteMsg, List voteMsgs) { String hash = voteMsg.getHash(); - //如果已经落地过了 - if (voteStateConcurrentHashMap.get(hash) != null) { - return; - } - //通过校验agree数量,来决定是否在本地生成Block long count = voteMsgs.stream().filter(VoteMsg::isAgree).count(); logger.info("已经commit为true的数量为:"+ count); diff --git a/src/main/java/com/mindata/blockchain/socket/pbft/queue/PreMsgQueue.java b/src/main/java/com/mindata/blockchain/socket/pbft/queue/PreMsgQueue.java index bce3233..7c34551 100644 --- a/src/main/java/com/mindata/blockchain/socket/pbft/queue/PreMsgQueue.java +++ b/src/main/java/com/mindata/blockchain/socket/pbft/queue/PreMsgQueue.java @@ -15,6 +15,7 @@ import com.mindata.blockchain.block.Block; import com.mindata.blockchain.common.AppId; import com.mindata.blockchain.common.timer.TimerManager; import com.mindata.blockchain.core.event.AddBlockEvent; +import com.mindata.blockchain.core.sqlite.SqliteManager; import com.mindata.blockchain.socket.pbft.VoteType; import com.mindata.blockchain.socket.pbft.event.MsgPrepareEvent; import com.mindata.blockchain.socket.pbft.msg.VoteMsg; @@ -29,6 +30,8 @@ import cn.hutool.core.bean.BeanUtil; */ @Component public class PreMsgQueue extends BaseMsgQueue { + @Resource + private SqliteManager sqliteManager; @Resource private PrepareMsgQueue prepareMsgQueue; @Resource @@ -53,6 +56,18 @@ public class PreMsgQueue extends BaseMsgQueue { logger.info("拒绝进入Prepare阶段,hash为" + hash); return; } + // 检测脚本是否正常 + try { + sqliteManager.tryExecute(votePreMsg.getBlock()); + } catch (Exception e) { + if(!"00001".equals(e.getMessage())){ + // 执行异常 + return; + }else{ + logger.info("指令预校验执行成功!"); + } + } + //存入Pre集合中 blockConcurrentHashMap.put(hash, votePreMsg); @@ -61,7 +76,7 @@ public class PreMsgQueue extends BaseMsgQueue { BeanUtil.copyProperties(voteMsg, prepareMsg); prepareMsg.setVoteType(VoteType.PREPARE); prepareMsg.setAppId(AppId.value); - eventPublisher.publishEvent(new MsgPrepareEvent(voteMsg)); + eventPublisher.publishEvent(new MsgPrepareEvent(prepareMsg)); } /** diff --git a/src/main/java/com/mindata/blockchain/socket/pbft/queue/PrepareMsgQueue.java b/src/main/java/com/mindata/blockchain/socket/pbft/queue/PrepareMsgQueue.java index 296d479..5cff848 100644 --- a/src/main/java/com/mindata/blockchain/socket/pbft/queue/PrepareMsgQueue.java +++ b/src/main/java/com/mindata/blockchain/socket/pbft/queue/PrepareMsgQueue.java @@ -39,11 +39,6 @@ public class PrepareMsgQueue extends AbstractVoteMsgQueue { @Override protected void deal(VoteMsg voteMsg, List voteMsgs) { String hash = voteMsg.getHash(); - //如果我已经对该hash的commit投过票了,就不再继续 - if (voteStateConcurrentHashMap.get(hash) != null) { - return; - } - VoteMsg commitMsg = new VoteMsg(); BeanUtil.copyProperties(voteMsg, commitMsg); commitMsg.setVoteType(VoteType.COMMIT); -- Gitee From 71276ff215724dbcf5ea16c46790a37464afa832 Mon Sep 17 00:00:00 2001 From: andylo25 Date: Tue, 12 Jun 2018 17:56:37 +0800 Subject: [PATCH 5/6] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dmessage=E6=9B=B4=E6=96=B0?= =?UTF-8?q?=E6=97=B6=E9=83=A8=E5=88=86=E5=80=BC=E4=B8=BA=E7=A9=BA=E3=80=81?= =?UTF-8?q?=E5=88=A0=E9=99=A4=E6=97=B6=E6=8A=A5No=20EntityManage=E5=BC=82?= =?UTF-8?q?=E5=B8=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../core/repository/MessageRepository.java | 3 +++ .../core/sqlparser/InstructionParserImpl.java | 11 ++++++++--- .../blockchain/core/sqlparser/MessageSqlParser.java | 13 +++++++++---- 3 files changed, 20 insertions(+), 7 deletions(-) diff --git a/src/main/java/com/mindata/blockchain/core/repository/MessageRepository.java b/src/main/java/com/mindata/blockchain/core/repository/MessageRepository.java index 097c7a1..b81ba06 100644 --- a/src/main/java/com/mindata/blockchain/core/repository/MessageRepository.java +++ b/src/main/java/com/mindata/blockchain/core/repository/MessageRepository.java @@ -1,5 +1,7 @@ package com.mindata.blockchain.core.repository; +import org.springframework.transaction.annotation.Transactional; + import com.mindata.blockchain.core.model.MessageEntity; /** @@ -10,6 +12,7 @@ public interface MessageRepository extends BaseRepository { * 删除一条记录 * @param messageId messageId */ + @Transactional void deleteByMessageId(String messageId); /** diff --git a/src/main/java/com/mindata/blockchain/core/sqlparser/InstructionParserImpl.java b/src/main/java/com/mindata/blockchain/core/sqlparser/InstructionParserImpl.java index ef23f5f..b120be8 100644 --- a/src/main/java/com/mindata/blockchain/core/sqlparser/InstructionParserImpl.java +++ b/src/main/java/com/mindata/blockchain/core/sqlparser/InstructionParserImpl.java @@ -1,12 +1,14 @@ package com.mindata.blockchain.core.sqlparser; +import javax.annotation.Resource; + +import org.springframework.stereotype.Service; + +import com.mindata.blockchain.block.Instruction; import com.mindata.blockchain.block.InstructionBase; import com.mindata.blockchain.common.FastJsonUtil; import com.mindata.blockchain.core.model.base.BaseEntity; import com.mindata.blockchain.core.model.convert.ConvertTableName; -import org.springframework.stereotype.Service; - -import javax.annotation.Resource; /** * 将区块内指令解析并入库 @@ -29,6 +31,9 @@ public class InstructionParserImpl implements InstructionP T object = FastJsonUtil.toBean(json, clazz); for (AbstractSqlParser sqlParser : sqlParsers) { if (clazz.equals(sqlParser.getEntityClass())) { + if(instructionBase instanceof Instruction){ + object.setPublicKey(((Instruction)instructionBase).getPublicKey()); + } sqlParser.parse(operation, instructionBase.getInstructionId(), object); break; } diff --git a/src/main/java/com/mindata/blockchain/core/sqlparser/MessageSqlParser.java b/src/main/java/com/mindata/blockchain/core/sqlparser/MessageSqlParser.java index 3002d7d..21bf77a 100644 --- a/src/main/java/com/mindata/blockchain/core/sqlparser/MessageSqlParser.java +++ b/src/main/java/com/mindata/blockchain/core/sqlparser/MessageSqlParser.java @@ -1,12 +1,16 @@ package com.mindata.blockchain.core.sqlparser; -import cn.hutool.core.bean.BeanUtil; +import javax.annotation.Resource; + +import org.springframework.stereotype.Service; + import com.mindata.blockchain.block.Operation; +import com.mindata.blockchain.common.CommonUtil; import com.mindata.blockchain.core.model.MessageEntity; import com.mindata.blockchain.core.repository.MessageRepository; -import org.springframework.stereotype.Service; -import javax.annotation.Resource; +import cn.hutool.core.bean.BeanUtil; +import cn.hutool.core.bean.copier.CopyOptions; /** * 解析语句入库的具体实现,Message表的 @@ -20,13 +24,14 @@ public class MessageSqlParser extends AbstractSqlParser { @Override public void parse(byte operation, String messageId, MessageEntity entity) { if (Operation.ADD == operation) { + entity.setCreateTime(CommonUtil.getNow()); entity.setMessageId(messageId); messageRepository.save(entity); } else if (Operation.DELETE == operation) { messageRepository.deleteByMessageId(messageId); } else if (Operation.UPDATE == operation) { MessageEntity messageEntity = messageRepository.findByMessageId(messageId); - BeanUtil.copyProperties(entity, messageEntity, "id", "createTime"); + BeanUtil.copyProperties(entity, messageEntity, CopyOptions.create().setIgnoreNullValue(true).setIgnoreProperties("id", "createTime")); messageRepository.save(messageEntity); } } -- Gitee From e6fbb213fcaea6385e8d650c73156b7f796acd6f Mon Sep 17 00:00:00 2001 From: andylo25 Date: Tue, 12 Jun 2018 17:58:34 +0800 Subject: [PATCH 6/6] =?UTF-8?q?singeNode=E9=BB=98=E8=AE=A4=E9=85=8D?= =?UTF-8?q?=E7=BD=AEfalse?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/mindata/blockchain/socket/client/ClientStarter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/mindata/blockchain/socket/client/ClientStarter.java b/src/main/java/com/mindata/blockchain/socket/client/ClientStarter.java index b14fc56..43acf29 100644 --- a/src/main/java/com/mindata/blockchain/socket/client/ClientStarter.java +++ b/src/main/java/com/mindata/blockchain/socket/client/ClientStarter.java @@ -54,7 +54,7 @@ public class ClientStarter { private String appId; @Value("${name}") private String name; - @Value("${singeNode}") + @Value("${singeNode:false}") private Boolean singeNode; private Logger logger = LoggerFactory.getLogger(getClass()); -- Gitee