diff --git a/.gitignore b/.gitignore index 2af7cefb0a3f1e7df2fc27b8421f0e16b460e680..f94d4027a8ce15849f175d5edb25a891ffb34e80 100644 --- a/.gitignore +++ b/.gitignore @@ -21,4 +21,4 @@ build/ nbbuild/ dist/ nbdist/ -.nb-gradle/ \ No newline at end of file +.nb-gradle/ diff --git a/Dockerfile b/Dockerfile index 042f3e0bf08abd1767896772dc7a4cbdf9471c02..b46e52029c8c1569e6fb15cbe639c70f6ec855d4 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,13 @@ -FROM daocloud.io/brave8/maven-jdk8 -ADD md_blockchain-0.0.1-SNAPSHOT.jar /tmp/app.jar -VOLUME /tmp +FROM openjdk:8-jdk-alpine +##注意,此处是将编译好的jar整个移动 +ADD md_blockchain-0.0.1-SNAPSHOT.jar /work/app.jar +VOLUME /work +ENV LANG en_US.UTF-8 +ENV LANGUAGE en_US:en +ENV LC_ALL en_US.UTF-8 +ENV TZ Asia/Shanghai + EXPOSE 8080 -ENTRYPOINT ["java","-jar","/tmp/app.jar"] \ No newline at end of file + +ENTRYPOINT ["java","-jar","/work/app.jar"] \ No newline at end of file diff --git a/LICENSE b/LICENSE index c5e4d1ef9b27a348ced0e05f31f9c214a9d42f5a..55bcb8eba9dd111eebdf6783c92cc7e2785096a9 100644 --- a/LICENSE +++ b/LICENSE @@ -2,6 +2,8 @@ Apache License Version 2.0, January 2004 http://www.apache.org/licenses/ +author wuweifeng + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION 1. Definitions. diff --git a/README.md b/README.md index e5548ef1258400f163a2f68fa67dd49b1f45a4e8..a65e08c36c9e4530765d1fab9856203d7f7f49d8 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,20 @@ +京东官方区块链项目JDChain,https://gitee.com/jdchain/jdchain + +有研究微服务网关权限的,在网关zuul中对所有下游服务权限做控制,覆盖到所有接口,权限控制到角色、菜单、按钮、方法。基于zuul纯内存的方式,校验时性能无损耗。参考我另一个项目 https://gitee.com/tianyalei/zuulauth + +有对多线程并行调度感兴趣的,参考另一个项目 https://gitee.com/jd-platform-opensource/asyncTool 该并发框架支持任意的多线程并行、串行、阻塞、依赖、回调,可以任意组合各线程的执行顺序,还带全链路回调。该项目在京东app后台正在试用,有海量用户、高并发等各种复杂极端场景。是作为Java程序员学习多线程的不可多得的好项目。 + +有对热key探测功能有需求,毫秒级探测热点数据,毫秒级推送至服务器集群内存,大幅降低热key对数据层查询压力,高性能热key探测中间件hotkey,https://gitee.com/jd-platform-opensource/hotkey。 + # md_blockchain -Java区块链平台,基于Springboot开发的区块链平台。区块链qq交流群737858576,刚建的群,一起学习区块链平台开发。 +Java区块链平台,基于Springboot开发的区块链平台。区块链qq交流群737858576,一起学习区块链平台开发,当然也交流Springboot、springcloud、机器学习等知识。 ### 起因 公司要开发区块链,原本是想着使用以太坊开发个合约或者是使用个第三方平台来做,后来发现都不符合业务需求。原因很简单,以太坊、超级账本等平台都是做共享账本的,有代币和挖矿等模块。而我们需要的就是数家公司组个联盟,来共同见证、记录一些不可篡改的交互信息,如A公司给B公司发了一个xxx请求,B公司响应了什么什么。其实要的就是一个分布式数据库,而且性能要好,不能像比特币那种10分钟才生成一个区块。我们要的更多的是数据库的性能,和区块链的一些特性。 ### 经过 -项目于3月初开始研发,历时一月发布了第一版。主要做了存储模块、加密模块、网络通信、公钥私钥、区块内容解析落地入库等。已经初步具备了区块链的基本特征,但在共识机制、merkle tree、智能合约以及其他的一些细节上,尚不到位。 +项目于18年3月初开始研发,历时一月发布了第一版。主要做了存储模块、加密模块、网络通信、PBFT共识算法、公钥私钥、区块内容解析落地入库等。已经初步具备了区块链的基本特征,但在merkle tree、智能合约以及其他的一些细节上,尚不到位。 希望高手不吝赐教,集思广益,提出见解或方案,来做一个区块链平台项目,适合更多的区块链场景,而不仅仅是账本和各种忽悠人的代币。 @@ -20,19 +28,23 @@ Java区块链平台,基于Springboot开发的区块链平台。区块链qq交 该项目属于"链",非"币"。不涉及虚拟币和挖矿。 ### 存储模块 -Block内存储的是类Sql语句。联盟间预先设定好符合业务场景需要的数据库表结构,然后设定好各个节点对表的操作权限(ADD,UPDATE,DELETE),将来各个节点就可以按照自己被允许的权限,进行Sql语句的编写,并打包至Block中,再全网广播,等待全网校验签名、权限等信息的合法性。如果Block合法,则允许生成,生成后再全网广播,各节点拉取新区块。新区块生成后,各节点进行区块内容解析,并落地入库的操作。 +Block内存储的是类Sql语句。联盟间预先设定好符合业务场景需要的数据库表结构,然后设定好各个节点对表的操作权限(ADD,UPDATE,DELETE),将来各个节点就可以按照自己被允许的权限,进行Sql语句的编写,并打包至Block中,再全网广播,等待全网校验签名、权限等信息的合法性。如果Block合法,则进入PBFT共识算法机制,各节点开始按照PrePrepare、Prepare、Commit等状态依次执行,直到2f+1个commit后,开始进行本地生成新区块。新区块生成后,各节点进行区块内容解析,并落地入库的操作。 场景就比较广泛了,可以设定不同的表结构,或者多个表,进而能完成各自类型信息的存储。譬如商品溯源,从生产商、运输、经销商、消费者等,每个环节都可以对某个商品进行ADD信息的操作。 -存储采用的是key-value数据库rocksDB,了解比特币的知道,比特币用的是levelDB,都是类似的东西。最近发现在部分Windows下,rocksDB加载失败。也可以替换为levelDB,只需要修改yml中db.levelDB为true,db.RocksDB为false即可。 +存储采用的是key-value数据库rocksDB,了解比特币的知道,比特币用的是levelDB,都是类似的东西。可以通过修改yml中db.levelDB为true,db.RocksDB为false来动态切换使用哪个数据库。 + +结构类似于sql的语句,如ADD(增删改) tableName(表名)ID(主键) JSON(该记录的json)。这里设置了回滚的逻辑,也就是当你做了一个ADD操作时,会同时存储一条Delete语句,以用于将来可能的回滚操作。 + -结构类似于sql的语句,如ADD(增删改) tableName(表名)ID(主键) JSON(该记录的json)。 ### 网络模块 网络层,采用的是各节点互相长连接、断线重连,然后维持心跳包。网络框架使用的是t-io,也是oschina的知名开源项目。t-io采用了AIO的方式,在大量长连接情况下性能优异,资源占用也很少,并且具备group功能,特别适合于做多个联盟链的SaaS平台。并且包含了心跳包、断线重连、retry等优秀功能。 在项目中,每个节点即是server,又是client,作为server则被其他的N-1个节点连接,作为client则去连接其他N-1个节点的server。同一个联盟,设定一个Group,每次发消息,直接调用sendGroup方法即可。 +但仍需要注意的是,由于项目采用了pbft共识算法,在达到共识的过程中,会产生N的3次方数量的网络通信,当节点数量较多,如已达到100时,每次共识将会给网络带来沉重的负担。这是算法本身的限制。 + ### 共识模块PBFT 分布式共识算法是分布式系统的核心,常见的有Paxos、pbft、bft、raft、pow等。区块链中常见的是POW、POS、DPOS、pbft等。 @@ -41,9 +53,9 @@ Block内存储的是类Sql语句。联盟间预先设定好符合业务场景需 区块链分如下三类: -私有链:这是指在企业内部部署的区块链应用,所有节点都是可以信任的; +私有链:这是指在企业内部部署的区块链应用,所有节点都是可以信任的,不存在恶意节点; -联盟链:半封闭生态的交易网络,存在不对等信任的节点; +联盟链:半封闭生态的交易网络,存在不对等信任的节点,可能存在恶意节点; 公有链:开放生态的交易网络,为联盟链和私有链等提供全球交易网络。 @@ -63,6 +75,8 @@ Block内存储的是类Sql语句。联盟间预先设定好符合业务场景需 (5)如果一个节点收到2f+1条(包括自己)commit消息,即可提交新区块到本地的区块链和状态数据库。 +(6)客户端收到f + 1个成功(即便有f个失败、再f个恶意返回的错误信息,f + 1个正确的也是多数派)的返回,即可认为该次写入请求是成功的。 + 可以看到,传统的pbft是需要先选举出leader的,然后由leader来搜集交易,并打包,然后广播出去。然后各个节点开始对新Block进行校验、投票、累积commit数量,最后落地。 而我这里对pbft做了修改,这是一个联盟,各个节点是平等的,而且性能要高。所以我不想让每个节点都生成一个指令后,发给其他节点,再大家选举出一个节点来搜集网络上的指令组合再生成Block,太复杂了,而且又存在了leader节点的故障隐患。 @@ -91,7 +105,7 @@ Block内存储的是类Sql语句。联盟间预先设定好符合业务场景需 可以通过访问localhost:8080/block?content=1来生成一个区块。正常使用时至少要启动4个节点才行,否则无法达成共识,PBFT要求2f+1个节点同意才能生成Block。为了方便测试,可以直接修改pbftSize的返回值为0,这样就能自己一个节点玩起来了。如果有多个节点,在生成Block后就会发现别的节点也会自动同步自己新生成的Block。目前代码里默认设置了一张表message,里面也只有一个字段content,相当于一个简单的区块链记事本。当有4个节点时,可以通过并发访问其中的几个来同时生成Block进行测试,看是否会分叉。还可以关停其中的一个,看其他的三个是否能达成共识(拜占庭最多容许f个节点故障,4个节点允许1个故障),恢复故障的那个,看是否能够同步其他正常节点的Block。可以进行各种测试,欢迎提bug。 -可以通过localhost:8080/sqlite来查看sqlite里存的数据,就是根据Block里的sql语句执行后的结果。 +可以通过localhost:8080/block/sqlite来查看sqlite里存的数据,就是根据Block里的sql语句执行后的结果。 我把项目部署到docker里了,共启动4个节点,如图: ![输入图片说明](https://gitee.com/uploads/images/2018/0404/105151_c8931604_303698.png "1.png") diff --git a/md_blockchain-0.0.1-SNAPSHOT.jar b/md_blockchain-0.0.1-SNAPSHOT.jar deleted file mode 100644 index 3f0b64a912fd3b1e560db2af73e77181d5b19e73..0000000000000000000000000000000000000000 Binary files a/md_blockchain-0.0.1-SNAPSHOT.jar and /dev/null differ diff --git a/pom.xml b/pom.xml index 87bda85cf35d3227e4caf518352ce1d96321c426..7a27a6245b6a2702df57ea249a2ce3c483f0c65c 100644 --- a/pom.xml +++ b/pom.xml @@ -1,6 +1,6 @@ + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 com.mindata.blockchain @@ -9,7 +9,7 @@ jar md_blockchain - Demo project for Spring Boot + java blockchain based on Spring Boot org.springframework.boot @@ -22,9 +22,24 @@ UTF-8 UTF-8 1.8 - 1.2.29 + 1.2.83 + 2.7.0 + 1.7 + + + ali-repos + ali Repository + http://maven.aliyun.com/nexus/content/groups/public + + + center-repos + center Repository + http://repo1.maven.org/maven2 + + + org.springframework.boot @@ -54,11 +69,11 @@ org.springframework.boot spring-boot-starter-data-jpa - - org.xerial - sqlite-jdbc - 3.20.0 - + + org.xerial + sqlite-jdbc + 3.20.0 + org.apache.commons @@ -71,14 +86,14 @@ tio-core 2.2.0.v20180405-RELEASE - - + + org.rocksdb rocksdbjni - 5.10.3 + 5.18.3 - + org.iq80.leveldb @@ -91,18 +106,18 @@ 0.10 - - - com.lmax - disruptor - 3.4.1 - - - - com.madgag.spongycastle - core - 1.54.0.0 - + + + com.lmax + disruptor + 3.4.1 + + + + com.madgag.spongycastle + core + 1.54.0.0 + org.bouncycastle bcprov-jdk15on @@ -113,7 +128,23 @@ commons-codec 1.10 - + + + io.springfox + springfox-swagger2 + ${swagger.version} + + + io.springfox + springfox-swagger-ui + ${swagger.version} + + + + com.github.xiaoymin + swagger-bootstrap-ui + ${swagger.bootstrap.version} + diff --git a/src/main/java/com/mindata/blockchain/block/check/BlockChecker.java b/src/main/java/com/mindata/blockchain/block/check/BlockChecker.java index a33ef3f93f605d0dc8e3b4ef49471919f7e3c62c..44fbd5af3f2c690d862688e296396162ae935ce7 100644 --- a/src/main/java/com/mindata/blockchain/block/check/BlockChecker.java +++ b/src/main/java/com/mindata/blockchain/block/check/BlockChecker.java @@ -40,4 +40,19 @@ public interface BlockChecker { * @return block */ int checkTime(Block block); + + /** + * 校验签名 + * @param block block + * @return block + */ + int checkSign(Block block); + + /** + * 校验block,包括签名、hash、关联关系 + * @param block + * @return + */ + public String checkBlock(Block block); + } diff --git a/src/main/java/com/mindata/blockchain/block/check/CheckerManager.java b/src/main/java/com/mindata/blockchain/block/check/CheckerManager.java index 96ae55da0bc6930d1c319ccaa8e806fe9c2792d7..ac8ba6a3b8e590414fe71ef4198ab81371b25af5 100644 --- a/src/main/java/com/mindata/blockchain/block/check/CheckerManager.java +++ b/src/main/java/com/mindata/blockchain/block/check/CheckerManager.java @@ -21,6 +21,11 @@ public class CheckerManager { * @return 校验结果 */ public RpcCheckBlockBody check(Block block) { + int code= blockChecker.checkSign(block); + if (code != 0) { + return new RpcCheckBlockBody(-1, "block的签名不合法"); + } + int number = blockChecker.checkNum(block); if (number != 0) { return new RpcCheckBlockBody(-1, "block的number不合法"); diff --git a/src/main/java/com/mindata/blockchain/block/check/local/DbBlockChecker.java b/src/main/java/com/mindata/blockchain/block/check/local/DbBlockChecker.java index 84f8d0e5e71e7610de034f1ed5d41f782b5083ed..56aad6527683e9085c025e0bf837bd45fe716dff 100644 --- a/src/main/java/com/mindata/blockchain/block/check/local/DbBlockChecker.java +++ b/src/main/java/com/mindata/blockchain/block/check/local/DbBlockChecker.java @@ -3,8 +3,13 @@ package com.mindata.blockchain.block.check.local; import cn.hutool.core.util.StrUtil; import com.mindata.blockchain.block.Block; import com.mindata.blockchain.block.check.BlockChecker; +import com.mindata.blockchain.common.Sha256; +import com.mindata.blockchain.common.exception.TrustSDKException; import com.mindata.blockchain.core.manager.DbBlockManager; import com.mindata.blockchain.core.manager.PermissionManager; +import com.mindata.blockchain.core.requestbody.BlockRequestBody; +import com.mindata.blockchain.core.service.BlockService; + import org.springframework.stereotype.Component; import javax.annotation.Resource; @@ -19,6 +24,9 @@ public class DbBlockChecker implements BlockChecker { private DbBlockManager dbBlockManager; @Resource private PermissionManager permissionManager; + + @Resource + private BlockService blockService; @Override public int checkNum(Block block) { @@ -66,8 +74,60 @@ public class DbBlockChecker implements BlockChecker { } return 0; } + + @Override + public int checkSign(Block block) { + if(!checkBlockHashSign(block)) { + return -1; + } + return 0; + } private Block getLastBlock() { return dbBlockManager.getLastBlock(); } + + public String checkBlock(Block block) { + if(!checkBlockHashSign(block)) return block.getHash(); + + String preHash = block.getBlockHeader().getHashPreviousBlock(); + if(preHash == null) return null; + + Block preBlock = dbBlockManager.getBlockByHash(preHash); + if(preBlock == null) return block.getHash(); + + int localNum = preBlock.getBlockHeader().getNumber(); + //当前区块+1等于下一个区块时才同意 + if (localNum + 1 != block.getBlockHeader().getNumber()) { + return block.getHash(); + } + if(block.getBlockHeader().getTimeStamp() <= preBlock.getBlockHeader().getTimeStamp()) { + return block.getHash(); + } + + + return null; + } + + /** + * 检测区块签名及hash是否符合 + * @param block + * @return + */ + private boolean checkBlockHashSign(Block block) { + BlockRequestBody blockRequestBody = new BlockRequestBody(); + blockRequestBody.setBlockBody(block.getBlockBody()); + blockRequestBody.setPublicKey(block.getBlockHeader().getPublicKey()); + try { + if(blockService.check(blockRequestBody) != null) return false; + } catch (TrustSDKException e) { + return false; + } + + String hash = Sha256.sha256(block.getBlockHeader().toString() + block.getBlockBody().toString()); + if(!StrUtil.equals(block.getHash(),hash)) return false; + + return true; + } + } diff --git a/src/main/java/com/mindata/blockchain/block/db/RocksDbStoreImpl.java b/src/main/java/com/mindata/blockchain/block/db/RocksDbStoreImpl.java index 28fb3fc8c3bf54f9c387e5f4259ed654a8c9e974..17d5cd4676d93d470f41338b9963d9836e939a41 100644 --- a/src/main/java/com/mindata/blockchain/block/db/RocksDbStoreImpl.java +++ b/src/main/java/com/mindata/blockchain/block/db/RocksDbStoreImpl.java @@ -34,7 +34,7 @@ public class RocksDbStoreImpl implements DbStore { try { byte[] bytes = rocksDB.get(key.getBytes(Const.CHARSET)); if (bytes != null) { - return new String(bytes); + return new String(bytes, Const.CHARSET); } return null; } catch (Exception e) { diff --git a/src/main/java/com/mindata/blockchain/common/CommonUtil.java b/src/main/java/com/mindata/blockchain/common/CommonUtil.java index d5c31c06c19cf18e1abb90a05c95bb6c27a2f8b9..57676f8a6003f5ea82d30aeb9c54e840662c28fb 100644 --- a/src/main/java/com/mindata/blockchain/common/CommonUtil.java +++ b/src/main/java/com/mindata/blockchain/common/CommonUtil.java @@ -18,10 +18,13 @@ public class CommonUtil { System.out.println(inetAddress.getHostName()); } + /** + * 获取本机ip地址 + */ public static String getLocalIp() { InetAddress inetAddress = getLocalHostLANAddress(); if (inetAddress != null) { - return inetAddress.getHostName(); + return inetAddress.getHostAddress(); } return null; } diff --git a/src/main/java/com/mindata/blockchain/common/SwaggerConfig.java b/src/main/java/com/mindata/blockchain/common/SwaggerConfig.java new file mode 100644 index 0000000000000000000000000000000000000000..7d14af34f28fefaca87730cc79eaa883f5033e9a --- /dev/null +++ b/src/main/java/com/mindata/blockchain/common/SwaggerConfig.java @@ -0,0 +1,112 @@ +package com.mindata.blockchain.common; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.Ordered; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.context.request.async.DeferredResult; +import org.springframework.web.servlet.config.annotation.ResourceHandlerRegistry; +import org.springframework.web.servlet.config.annotation.ViewControllerRegistry; +import org.springframework.web.servlet.config.annotation.WebMvcConfigurerAdapter; +import springfox.documentation.builders.ApiInfoBuilder; +import springfox.documentation.builders.PathSelectors; +import springfox.documentation.builders.RequestHandlerSelectors; +import springfox.documentation.builders.ResponseMessageBuilder; +import springfox.documentation.schema.ModelRef; +import springfox.documentation.service.ApiInfo; +import springfox.documentation.service.Contact; +import springfox.documentation.service.ResponseMessage; +import springfox.documentation.spi.DocumentationType; +import springfox.documentation.spring.web.plugins.Docket; +import springfox.documentation.swagger2.annotations.EnableSwagger2; + +import java.util.ArrayList; + +/** + * swagger配置 + */ +@Configuration +@EnableSwagger2 +@ComponentScan(basePackages = {"com.mindata"}) +public class SwaggerConfig extends WebMvcConfigurerAdapter { + + @Override + public void addViewControllers( ViewControllerRegistry registry ) { + /*registry.addViewController( "/" ).setViewName("redirect:/swagger-ui.html");*/ + registry.addViewController( "/" ).setViewName("redirect:/doc.html"); + + registry.setOrder(Ordered.HIGHEST_PRECEDENCE ); + } + + /** + * 配置资源放行 + */ + @Override + public void addResourceHandlers(ResourceHandlerRegistry registry) { + /** + * 配置swagger映射路径 + */ + registry.addResourceHandler("swagger-ui.html").addResourceLocations("classpath:/META-INF/resources/"); + registry.addResourceHandler("/webjars/**").addResourceLocations("classpath:/META-INF/resources/webjars/"); + } + + /*添加自定义异常信息*/ + private ArrayList responseMessages = new ArrayList() { + private static final long serialVersionUID = 1L; + { + add(new ResponseMessageBuilder().code(200).message("请求成功").build()); + add(new ResponseMessageBuilder().code(400).message("请求参数错误").responseModel(new ModelRef("Error")).build()); + add(new ResponseMessageBuilder().code(401).message("权限认证失败").responseModel(new ModelRef("Error")).build()); + add(new ResponseMessageBuilder().code(404).message("请求资源不存在").responseModel(new ModelRef("Error")).build()); + add(new ResponseMessageBuilder().code(405).message("请求方式不支持").responseModel(new ModelRef("Error")).build()); + add(new ResponseMessageBuilder().code(500).message("服务器内部错误").responseModel(new ModelRef("Error")).build()); + } + }; + + /** + * swagger分组OpenApi + * @return + */ + @Bean + public Docket customDocket() { + //自定义异常信息 + return new Docket(DocumentationType.SWAGGER_2) + .enable(true) + .groupName("OpenApi") + .genericModelSubstitutes(DeferredResult.class) + .useDefaultResponseMessages(false) + .forCodeGeneration(true) + .select() + .apis(RequestHandlerSelectors.basePackage("com.mindata.blockchain.core.controller")) + .paths(PathSelectors.any()) + .build() + .globalResponseMessage(RequestMethod.GET, responseMessages) + .globalResponseMessage(RequestMethod.POST, responseMessages) + .globalResponseMessage(RequestMethod.PUT, responseMessages) + .globalResponseMessage(RequestMethod.DELETE, responseMessages) + .globalResponseMessage(RequestMethod.PATCH, responseMessages) + .apiInfo(apiInfo()); + } + /** + * swagger分组OpenApi + * 配置文档说明信息 + * @return + */ + private ApiInfo apiInfo() { + Contact contact = new Contact("付为地", "", "1335157415@qq.com"); + StringBuffer sb=new StringBuffer(1024); + sb.append("        Swagger是一款RESTFUL接口的文档在线自动生成+功能测试功能软件,用于生成、描述、调用和可视化 RESTful风格的 Web服务。
总体目标是使客户端和文件系统作为服务器以同样的速度来更新,文件的方法,参数和模型紧密集成到服务器端的代码,允许API来始终保持同步。
"); + sb.append("注意事项:

    "); + sb.append("                
  • 1.本接口文档,如果没有特殊限定,默认仅支持POST方式。
  • "); + sb.append("                
  • 2.参数描述模型示例,分别针对请求的参数结构参数示例进行描述。
  • "); + sb.append("                
  • 3.响应描述模型示例,分别针对响应的数据结构数据示例进行描述。
"); + return new ApiInfoBuilder() + .title("API开放接口") + .description(sb.toString()) + .contact(contact) + .version("1.1.0") + .build(); + } + +} \ No newline at end of file diff --git a/src/main/java/com/mindata/blockchain/common/timer/TimerManager.java b/src/main/java/com/mindata/blockchain/common/timer/TimerManager.java new file mode 100644 index 0000000000000000000000000000000000000000..2a11451f49ca59abc750516774e3fb6d3e7b9461 --- /dev/null +++ b/src/main/java/com/mindata/blockchain/common/timer/TimerManager.java @@ -0,0 +1,46 @@ +package com.mindata.blockchain.common.timer; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +/** + * 异步延时执行 + * 需要调用延迟执行的时候可使用,避免在代码里面直接调用Thread.sleep()方法 + * + * @author andylo25 wrote on 2018/6/11. + */ +public class TimerManager { + + private volatile static ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3); + + public static void schedule(Supplier action, long delay) { + executorService.schedule(new Runnable() { + @Override + public void run() { + action.get(); + } + }, delay, TimeUnit.MILLISECONDS); + } + + public static void scheduleAtFixedRate(Supplier action, long initialDelay, long period) { + executorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + action.get(); + } + }, initialDelay, period, TimeUnit.MILLISECONDS); + } + + public static void scheduleWithFixedDelay(Supplier action, long initialDelay, long period) { + executorService.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + action.get(); + } + }, initialDelay, period, TimeUnit.MILLISECONDS); + } + + +} diff --git a/src/main/java/com/mindata/blockchain/core/controller/BlockController.java b/src/main/java/com/mindata/blockchain/core/controller/BlockController.java index 35a6fbc825c1cbff294a28d8756825b37434212b..73d89178d570f7c7f63b9e6a4fcf419347ff0c21 100644 --- a/src/main/java/com/mindata/blockchain/core/controller/BlockController.java +++ b/src/main/java/com/mindata/blockchain/core/controller/BlockController.java @@ -5,6 +5,7 @@ import com.mindata.blockchain.ApplicationContextProvider; import com.mindata.blockchain.block.Block; import com.mindata.blockchain.block.Instruction; import com.mindata.blockchain.block.Operation; +import com.mindata.blockchain.block.check.BlockChecker; import com.mindata.blockchain.common.exception.TrustSDKException; import com.mindata.blockchain.core.bean.BaseData; import com.mindata.blockchain.core.bean.ResultGenerator; @@ -12,6 +13,7 @@ import com.mindata.blockchain.core.event.DbSyncEvent; import com.mindata.blockchain.core.manager.DbBlockManager; import com.mindata.blockchain.core.manager.MessageManager; import com.mindata.blockchain.core.manager.SyncManager; +import com.mindata.blockchain.core.model.MessageEntity; import com.mindata.blockchain.core.requestbody.BlockRequestBody; import com.mindata.blockchain.core.requestbody.InstructionBody; import com.mindata.blockchain.core.service.BlockService; @@ -21,15 +23,23 @@ import com.mindata.blockchain.socket.client.PacketSender; import com.mindata.blockchain.socket.packet.BlockPacket; import com.mindata.blockchain.socket.packet.PacketBuilder; import com.mindata.blockchain.socket.packet.PacketType; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiParam; +import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.factory.annotation.Value; import org.springframework.data.domain.Pageable; import org.springframework.data.web.PageableDefault; +import org.springframework.util.ObjectUtils; import org.springframework.web.bind.annotation.*; +import springfox.documentation.annotations.ApiIgnore; import javax.annotation.Resource; /** * @author wuweifeng wrote on 2018/3/7. */ +@Api(tags = "区块链接口", description = "简单区块链功能接口") @RestController @RequestMapping("/block") public class BlockController { @@ -45,6 +55,12 @@ public class BlockController { private SyncManager syncManager; @Resource private MessageManager messageManager; + @Resource + private BlockChecker blockChecker; + @Value("${publicKey:A8WLqHTjcT/FQ2IWhIePNShUEcdCzu5dG+XrQU8OMu54}") + private String publicKey; + @Value("${privateKey:yScdp6fNgUU+cRUTygvJG4EBhDKmOMRrK4XJ9mKVQJ8=}") + private String privateKey; /** * 添加一个block,需要先在InstructionController构建1-N个instruction指令,然后调用该接口生成Block @@ -53,42 +69,113 @@ public class BlockController { * 指令的集合 * @return 结果 */ - @PostMapping - public BaseData add(@RequestBody BlockRequestBody blockRequestBody) throws TrustSDKException { - if (blockService.check(blockRequestBody) != null) { - return ResultGenerator.genFailResult(blockService.check(blockRequestBody)); + @ApiIgnore + @PostMapping("/insert") + @ApiOperation(value = "添加一个区块", notes = "测试添加一个区块", httpMethod = "POST", response = BaseData.class) + public BaseData insert(@ApiParam(name = "blockRequestBody对象", value = "传入json格式", required = true) @RequestBody BlockRequestBody blockRequestBody) throws TrustSDKException { + String msg = blockService.check(blockRequestBody); + if (msg != null) { + return ResultGenerator.genFailResult(msg); } return ResultGenerator.genSuccessResult(blockService.addBlock(blockRequestBody)); } /** - * 测试生成一个Block,公钥私钥可以通过PairKeyController来生成 + * 测试生成一个insert:Block,公钥私钥可以通过PairKeyController来生成 * @param content * sql内容 */ - @GetMapping - public BaseData test(String content) throws Exception { + @GetMapping("/create") + @ApiOperation(value = "创建一个区块", notes = "创建一个新区块", httpMethod = "GET", response = BaseData.class) + public BaseData create(@ApiParam(name = "content", value = "区块链内容", required = true) @RequestParam(value = "content") String content) throws Exception { InstructionBody instructionBody = new InstructionBody(); instructionBody.setOperation(Operation.ADD); instructionBody.setTable("message"); instructionBody.setJson("{\"content\":\"" + content + "\"}"); - instructionBody.setPublicKey("A8WLqHTjcT/FQ2IWhIePNShUEcdCzu5dG+XrQU8OMu54"); - instructionBody.setPrivateKey("yScdp6fNgUU+cRUTygvJG4EBhDKmOMRrK4XJ9mKVQJ8="); + /*instructionBody.setPublicKey("A8WLqHTjcT/FQ2IWhIePNShUEcdCzu5dG+XrQU8OMu54"); + instructionBody.setPrivateKey("yScdp6fNgUU+cRUTygvJG4EBhDKmOMRrK4XJ9mKVQJ8=");*/ + instructionBody.setPublicKey(publicKey); + instructionBody.setPrivateKey(privateKey); Instruction instruction = instructionService.build(instructionBody); BlockRequestBody blockRequestBody = new BlockRequestBody(); - blockRequestBody.setPublicKey("A8WLqHTjcT/FQ2IWhIePNShUEcdCzu5dG+XrQU8OMu54"); + blockRequestBody.setPublicKey(instructionBody.getPublicKey()); com.mindata.blockchain.block.BlockBody blockBody = new com.mindata.blockchain.block.BlockBody(); blockBody.setInstructions(CollectionUtil.newArrayList(instruction)); - blockRequestBody.setBlockBody(blockBody); + blockRequestBody.setBlockBody(blockBody); return ResultGenerator.genSuccessResult(blockService.addBlock(blockRequestBody)); } + + /** + * 测试生成一个update:Block,公钥私钥可以通过PairKeyController来生成 + * @param id 更新的主键 + * @param content + * sql内容 + */ + @GetMapping("update") + @ApiOperation(value = "更新区块链内容", notes = "根据ID更新区块链内容", httpMethod = "GET", response = BaseData.class) + public BaseData testUpdate(@ApiParam(name = "id", value = "区块链信息编号", required = true) @RequestParam(value = "id",required = true) String id, + @ApiParam(name = "content", value = "区块链内容", required = true) @RequestParam(value = "content") String content) throws Exception { + if(StringUtils.isBlank(id)) ResultGenerator.genSuccessResult("主键不可为空"); + InstructionBody instructionBody = new InstructionBody(); + instructionBody.setOperation(Operation.UPDATE); + instructionBody.setTable("message"); + instructionBody.setInstructionId(id); + instructionBody.setJson("{\"content\":\"" + content + "\"}"); + /*instructionBody.setPublicKey("A8WLqHTjcT/FQ2IWhIePNShUEcdCzu5dG+XrQU8OMu54"); + instructionBody.setPrivateKey("yScdp6fNgUU+cRUTygvJG4EBhDKmOMRrK4XJ9mKVQJ8=");*/ + instructionBody.setPublicKey(publicKey); + instructionBody.setPrivateKey(privateKey); + Instruction instruction = instructionService.build(instructionBody); + + BlockRequestBody blockRequestBody = new BlockRequestBody(); + blockRequestBody.setPublicKey(instructionBody.getPublicKey()); + com.mindata.blockchain.block.BlockBody blockBody = new com.mindata.blockchain.block.BlockBody(); + blockBody.setInstructions(CollectionUtil.newArrayList(instruction)); + + blockRequestBody.setBlockBody(blockBody); + + return ResultGenerator.genSuccessResult(blockService.addBlock(blockRequestBody)); + } + + /** + * 测试生成一个delete:Block,公钥私钥可以通过PairKeyController来生成 + * @param id 待删除记录的主键 + * sql内容 + */ + @GetMapping("delete") + @ApiOperation(value = "删除区块内容", notes = "删除区块链内容", httpMethod = "GET", response = BaseData.class) + public BaseData delete(@ApiParam(name = "id", value = "区块链信息编号", required = true) @RequestParam(value = "id",required = true) String id) throws Exception { + if(StringUtils.isBlank(id)) ResultGenerator.genSuccessResult("主键不可为空"); + InstructionBody instructionBody = new InstructionBody(); + instructionBody.setOperation(Operation.DELETE); + instructionBody.setTable("message"); + instructionBody.setInstructionId(id); + MessageEntity message=messageManager.findById(id); + String content=ObjectUtils.isEmpty(message)?"":message.getContent(); + instructionBody.setJson("{\"content\":\"" + content + "\"}"); + /*instructionBody.setPublicKey("A8WLqHTjcT/FQ2IWhIePNShUEcdCzu5dG+XrQU8OMu54"); + instructionBody.setPrivateKey("yScdp6fNgUU+cRUTygvJG4EBhDKmOMRrK4XJ9mKVQJ8=");*/ + instructionBody.setPublicKey(publicKey); + instructionBody.setPrivateKey(privateKey); + Instruction instruction = instructionService.build(instructionBody); + + BlockRequestBody blockRequestBody = new BlockRequestBody(); + blockRequestBody.setPublicKey(instructionBody.getPublicKey()); + com.mindata.blockchain.block.BlockBody blockBody = new com.mindata.blockchain.block.BlockBody(); + blockBody.setInstructions(CollectionUtil.newArrayList(instruction)); + + blockRequestBody.setBlockBody(blockBody); + + return ResultGenerator.genSuccessResult(blockService.addBlock(blockRequestBody)); + } /** * 查询已落地的sqlite里的所有数据 */ + @ApiOperation(value = "查询区块链数据", notes = "查询区块链数据", httpMethod = "GET", response = BaseData.class) @GetMapping("sqlite") public BaseData sqlite() { return ResultGenerator.genSuccessResult(messageManager.findAll()); @@ -97,6 +184,7 @@ public class BlockController { /** * 查询已落地的sqlite里content字段 */ + @ApiOperation(value = "查询区块链内容", notes = "查询区块链内容", httpMethod = "GET", response = BaseData.class) @GetMapping("sqlite/content") public BaseData content() { return ResultGenerator.genSuccessResult(messageManager.findAllContent()); @@ -105,8 +193,9 @@ public class BlockController { /** * 获取最后一个block的信息 */ - @GetMapping("db") - public BaseData getRockDB() { + @ApiOperation(value = "获取最后一个块信息", notes = "获取最后一个块信息", httpMethod = "GET", response = BaseData.class) + @GetMapping("last") + public BaseData last() { return ResultGenerator.genSuccessResult(dbBlockManager.getLastBlock()); } @@ -117,19 +206,48 @@ public class BlockController { * @return * 已同步到哪块了的信息 */ + @ApiIgnore + @ApiOperation(value = "手工执行区块内sql落地到sqlite操作", notes = "获取数据同步到的区块信息", httpMethod = "GET", response = BaseData.class) @GetMapping("sync") - public BaseData sync(@PageableDefault Pageable pageable) { + public BaseData sync( @PageableDefault Pageable pageable) { ApplicationContextProvider.publishEvent(new DbSyncEvent("")); return ResultGenerator.genSuccessResult(syncManager.findAll(pageable)); } + + /** + * 全量检测区块是否正常 + * @return + * null - 通过 + * hash - 第一个异常hash + */ + @ApiIgnore + @ApiOperation(value = "全量检测区块是否正常", notes = "全量检测区块是否正常", httpMethod = "GET", response = BaseData.class) + @GetMapping("check") + public BaseData check() { + + Block block = dbBlockManager.getFirstBlock(); + + String hash = null; + while(block != null && hash == null) { + hash = blockChecker.checkBlock(block); + block = dbBlockManager.getNextBlock(block); + } + return ResultGenerator.genSuccessResult(hash); + } - @GetMapping("/next") - public BaseData nextBlock() { + /** + * 获取第一个区块信息 + */ + @ApiOperation(value = "获取第一个区块信息", notes = "获取第一个区块信息", httpMethod = "GET", response = BaseData.class) + @GetMapping("/first") + public BaseData first() { Block block = dbBlockManager.getFirstBlock(); BlockPacket packet = new PacketBuilder() .setType(PacketType.NEXT_BLOCK_INFO_REQUEST) .setBody(new RpcBlockBody(block)).build(); packetSender.sendGroup(packet); - return null; + return ResultGenerator.genSuccessResult(block); } + + } diff --git a/src/main/java/com/mindata/blockchain/core/controller/InstructionController.java b/src/main/java/com/mindata/blockchain/core/controller/InstructionController.java index 80182183ff0f240fa5535ecd0e05bbe6822a054b..77c76abc3f21b620a46cd27f43725c9167c4db5c 100644 --- a/src/main/java/com/mindata/blockchain/core/controller/InstructionController.java +++ b/src/main/java/com/mindata/blockchain/core/controller/InstructionController.java @@ -8,6 +8,7 @@ import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; +import springfox.documentation.annotations.ApiIgnore; import javax.annotation.Resource; @@ -15,6 +16,7 @@ import javax.annotation.Resource; * 区块body内单个指令的controller * @author wuweifeng wrote on 2018/3/7. */ +@ApiIgnore @RestController @RequestMapping("/instruction") public class InstructionController { diff --git a/src/main/java/com/mindata/blockchain/core/controller/PairKeyController.java b/src/main/java/com/mindata/blockchain/core/controller/PairKeyController.java index 121e19ccf44d9b0072222a5233ab7c9f15ff401a..862282603b23735b2d0244f30cf8be32e4bcb95e 100644 --- a/src/main/java/com/mindata/blockchain/core/controller/PairKeyController.java +++ b/src/main/java/com/mindata/blockchain/core/controller/PairKeyController.java @@ -4,6 +4,8 @@ import com.mindata.blockchain.common.exception.TrustSDKException; import com.mindata.blockchain.core.bean.BaseData; import com.mindata.blockchain.core.bean.ResultGenerator; import com.mindata.blockchain.core.service.PairKeyService; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @@ -13,6 +15,7 @@ import javax.annotation.Resource; /** * @author wuweifeng wrote on 2018/3/7. */ +@Api(tags = "区块链接口", description = "公私钥接口") @RestController @RequestMapping("/pairKey") public class PairKeyController { @@ -22,6 +25,7 @@ public class PairKeyController { /** * 生成公钥私钥 */ + @ApiOperation(value = "区块链公私钥接口", notes = "生成区块链节点公私钥", httpMethod = "GET", response = BaseData.class) @GetMapping("/random") public BaseData generate() throws TrustSDKException { return ResultGenerator.genSuccessResult(pairKeyService.generate()); diff --git a/src/main/java/com/mindata/blockchain/core/event/NodesConnectedEvent.java b/src/main/java/com/mindata/blockchain/core/event/NodesConnectedEvent.java new file mode 100644 index 0000000000000000000000000000000000000000..d2690136c3c68a6f50b5e5f77994d83ce8479c66 --- /dev/null +++ b/src/main/java/com/mindata/blockchain/core/event/NodesConnectedEvent.java @@ -0,0 +1,21 @@ +package com.mindata.blockchain.core.event; + +import org.springframework.context.ApplicationEvent; +import org.tio.core.ChannelContext; + +/** + * 节点连接完成时会触发该Event + * @author andylo25 wrote on 2018/6/15. + */ +public class NodesConnectedEvent extends ApplicationEvent { + private static final long serialVersionUID = 526755692642414178L; + + public NodesConnectedEvent(ChannelContext channelContext) { + super(channelContext); + } + + public ChannelContext getSource() { + return (ChannelContext) source; + } + +} diff --git a/src/main/java/com/mindata/blockchain/core/manager/MessageManager.java b/src/main/java/com/mindata/blockchain/core/manager/MessageManager.java index e53caf9b367df3b6aca2c64f0dc24792b511007d..b8c8323bbb1b3eca44632194cd097f6c8d7a2980 100644 --- a/src/main/java/com/mindata/blockchain/core/manager/MessageManager.java +++ b/src/main/java/com/mindata/blockchain/core/manager/MessageManager.java @@ -23,4 +23,8 @@ public class MessageManager { public List findAllContent() { return findAll().stream().map(MessageEntity::getContent).collect(Collectors.toList()); } + + public MessageEntity findById(String id) { + return messageRepository.findByMessageId(id); + } } diff --git a/src/main/java/com/mindata/blockchain/core/service/InstructionService.java b/src/main/java/com/mindata/blockchain/core/service/InstructionService.java index b7d1bb72a53c2da1c69e80119b693b0bb3d3297c..ba2ff7be27ac968de9d085934ca8b31318a28dcd 100644 --- a/src/main/java/com/mindata/blockchain/core/service/InstructionService.java +++ b/src/main/java/com/mindata/blockchain/core/service/InstructionService.java @@ -60,8 +60,7 @@ public class InstructionService { instruction.setInstructionId(CommonUtil.generateUuid()); } instruction.setTimeStamp(CommonUtil.getNow()); - String buildStr = instructionBody.getOperation() + instructionBody.getTable() + instructionBody - .getInstructionId() + instructionBody.getJson() + instructionBody.getOldJson(); + String buildStr = getSignString(instruction); //设置签名,供其他人验证 instruction.setSign(TrustSDK.signString(instructionBody.getPrivateKey(), buildStr)); //设置hash,防止篡改 @@ -69,6 +68,11 @@ public class InstructionService { return instruction; } + + private String getSignString(Instruction instruction) { + return instruction.getOperation() + instruction.getTable() + instruction + .getInstructionId() + (instruction.getJson()==null?"":instruction.getJson()); + } /** * 根据一个指令,计算它的回滚时的指令。

@@ -95,14 +99,12 @@ public class InstructionService { } public boolean checkSign(Instruction instruction) throws TrustSDKException { - String buildStr = instruction.getOperation() + - instruction.getTable() + instruction.getJson(); + String buildStr = getSignString(instruction); return TrustSDK.verifyString(instruction.getPublicKey(), buildStr, instruction.getSign()); } public boolean checkHash(Instruction instruction) { - String buildStr = instruction.getOperation() + - instruction.getTable() + instruction.getJson(); + String buildStr = getSignString(instruction); return Sha256.sha256(buildStr).equals(instruction.getHash()); } } diff --git a/src/main/java/com/mindata/blockchain/core/sqlite/SqliteManager.java b/src/main/java/com/mindata/blockchain/core/sqlite/SqliteManager.java index 184cf52e55d1faab98596946663864b54bfb4e24..fb47d5810c82638f5375b5cdc7cfe96c99e4be97 100644 --- a/src/main/java/com/mindata/blockchain/core/sqlite/SqliteManager.java +++ b/src/main/java/com/mindata/blockchain/core/sqlite/SqliteManager.java @@ -15,6 +15,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; import javax.annotation.Resource; import java.util.ArrayList; @@ -69,11 +70,13 @@ public class SqliteManager { /** * 根据一个block执行sql - * + * 整个block一个事务 + * * @param block * block */ - private void execute(Block block) { + @Transactional(rollbackFor = Exception.class) + public void execute(Block block) { List instructions = block.getBlockBody().getInstructions(); //InstructionParserImpl类里面执行的是InstructionBase,需要转成InstructionBase for (Instruction instruction : instructions) { @@ -93,7 +96,7 @@ public class SqliteManager { * @param block * block */ - private void rollBack(Block block) { + public void rollBack(Block block) { List instructions = block.getBlockBody().getInstructions(); int size = instructions.size(); //需要对语句集合进行反转,然后执行和execute一样的操作 @@ -109,4 +112,14 @@ public class SqliteManager { instructionParser.parse(instruction); } } + + /** + * 测试block的代码是否能正确执行 + * + * @param block block + */ + @Transactional(rollbackFor = Exception.class) + public void tryExecute(Block block) { + execute(block); + } } diff --git a/src/main/java/com/mindata/blockchain/core/sqlparser/InstructionParserImpl.java b/src/main/java/com/mindata/blockchain/core/sqlparser/InstructionParserImpl.java index ef23f5f714193f17c0aae1e0f4b63e3948449866..b120be88145ca3842b2f3d99440a4206ed1811c0 100644 --- a/src/main/java/com/mindata/blockchain/core/sqlparser/InstructionParserImpl.java +++ b/src/main/java/com/mindata/blockchain/core/sqlparser/InstructionParserImpl.java @@ -1,12 +1,14 @@ package com.mindata.blockchain.core.sqlparser; +import javax.annotation.Resource; + +import org.springframework.stereotype.Service; + +import com.mindata.blockchain.block.Instruction; import com.mindata.blockchain.block.InstructionBase; import com.mindata.blockchain.common.FastJsonUtil; import com.mindata.blockchain.core.model.base.BaseEntity; import com.mindata.blockchain.core.model.convert.ConvertTableName; -import org.springframework.stereotype.Service; - -import javax.annotation.Resource; /** * 将区块内指令解析并入库 @@ -29,6 +31,9 @@ public class InstructionParserImpl implements InstructionP T object = FastJsonUtil.toBean(json, clazz); for (AbstractSqlParser sqlParser : sqlParsers) { if (clazz.equals(sqlParser.getEntityClass())) { + if(instructionBase instanceof Instruction){ + object.setPublicKey(((Instruction)instructionBase).getPublicKey()); + } sqlParser.parse(operation, instructionBase.getInstructionId(), object); break; } diff --git a/src/main/java/com/mindata/blockchain/core/sqlparser/MessageSqlParser.java b/src/main/java/com/mindata/blockchain/core/sqlparser/MessageSqlParser.java index 3002d7d35435958244466c76f3b9d08566a6f96b..21bf77a3ac49b8abf5d8fe7e4f4208f00a37ae40 100644 --- a/src/main/java/com/mindata/blockchain/core/sqlparser/MessageSqlParser.java +++ b/src/main/java/com/mindata/blockchain/core/sqlparser/MessageSqlParser.java @@ -1,12 +1,16 @@ package com.mindata.blockchain.core.sqlparser; -import cn.hutool.core.bean.BeanUtil; +import javax.annotation.Resource; + +import org.springframework.stereotype.Service; + import com.mindata.blockchain.block.Operation; +import com.mindata.blockchain.common.CommonUtil; import com.mindata.blockchain.core.model.MessageEntity; import com.mindata.blockchain.core.repository.MessageRepository; -import org.springframework.stereotype.Service; -import javax.annotation.Resource; +import cn.hutool.core.bean.BeanUtil; +import cn.hutool.core.bean.copier.CopyOptions; /** * 解析语句入库的具体实现,Message表的 @@ -20,13 +24,14 @@ public class MessageSqlParser extends AbstractSqlParser { @Override public void parse(byte operation, String messageId, MessageEntity entity) { if (Operation.ADD == operation) { + entity.setCreateTime(CommonUtil.getNow()); entity.setMessageId(messageId); messageRepository.save(entity); } else if (Operation.DELETE == operation) { messageRepository.deleteByMessageId(messageId); } else if (Operation.UPDATE == operation) { MessageEntity messageEntity = messageRepository.findByMessageId(messageId); - BeanUtil.copyProperties(entity, messageEntity, "id", "createTime"); + BeanUtil.copyProperties(entity, messageEntity, CopyOptions.create().setIgnoreNullValue(true).setIgnoreProperties("id", "createTime")); messageRepository.save(messageEntity); } } diff --git a/src/main/java/com/mindata/blockchain/socket/client/BlockClientAioHandler.java b/src/main/java/com/mindata/blockchain/socket/client/BlockClientAioHandler.java index 3e835386740d50c00e5dda6442486a51f4036dc6..cc064f1d3beafef36545b339d416da8961803d01 100644 --- a/src/main/java/com/mindata/blockchain/socket/client/BlockClientAioHandler.java +++ b/src/main/java/com/mindata/blockchain/socket/client/BlockClientAioHandler.java @@ -1,11 +1,10 @@ package com.mindata.blockchain.socket.client; import com.mindata.blockchain.ApplicationContextProvider; +import com.mindata.blockchain.socket.base.AbstractAioHandler; import com.mindata.blockchain.socket.distruptor.base.BaseEvent; import com.mindata.blockchain.socket.distruptor.base.MessageProducer; -import com.mindata.blockchain.socket.base.AbstractAioHandler; import com.mindata.blockchain.socket.packet.BlockPacket; -import com.mindata.blockchain.socket.packet.NextBlockPacketBuilder; import org.tio.client.intf.ClientAioHandler; import org.tio.core.ChannelContext; import org.tio.core.intf.Packet; @@ -18,7 +17,8 @@ public class BlockClientAioHandler extends AbstractAioHandler implements ClientA @Override public BlockPacket heartbeatPacket() { //心跳包的内容就是隔一段时间向别的节点获取一次下一步区块(带着自己的最新Block获取别人的next Block) - return NextBlockPacketBuilder.build(); + //return NextBlockPacketBuilder.build(); + return null; } /** diff --git a/src/main/java/com/mindata/blockchain/socket/client/BlockClientAioListener.java b/src/main/java/com/mindata/blockchain/socket/client/BlockClientAioListener.java index b73f46d9229bacedcb8d2095c657c042445275a5..33d474244555809be74749495ccc1526c3eee6d8 100644 --- a/src/main/java/com/mindata/blockchain/socket/client/BlockClientAioListener.java +++ b/src/main/java/com/mindata/blockchain/socket/client/BlockClientAioListener.java @@ -7,7 +7,8 @@ import org.tio.core.Aio; import org.tio.core.ChannelContext; import org.tio.core.intf.Packet; -import static com.mindata.blockchain.socket.common.Const.GROUP_NAME; +import com.mindata.blockchain.ApplicationContextProvider; +import com.mindata.blockchain.core.event.NodesConnectedEvent; /** * client端对各个server连接的情况回调。

@@ -20,12 +21,13 @@ public class BlockClientAioListener implements ClientAioListener { @Override public void onAfterConnected(ChannelContext channelContext, boolean isConnected, boolean isReconnect) throws Exception { - if (isConnected) { - logger.info("连接成功:server地址为-" + channelContext.getServerNode()); - Aio.bindGroup(channelContext, GROUP_NAME); - } else { - logger.info("连接失败:server地址为-" + channelContext.getServerNode()); - } +// if (isConnected) { +// logger.info("连接成功:server地址为-" + channelContext.getServerNode()); +// Aio.bindGroup(channelContext, Const.GROUP_NAME); +// } else { +// logger.info("连接失败:server地址为-" + channelContext.getServerNode()); +// } + ApplicationContextProvider.publishEvent(new NodesConnectedEvent(channelContext)); } @Override diff --git a/src/main/java/com/mindata/blockchain/socket/client/ClientStarter.java b/src/main/java/com/mindata/blockchain/socket/client/ClientStarter.java index f871f7045b30b038a18cefb3bd17dd7a3a0d42f5..ad185e0cb0fa2dd3b0bcb905052aaea146ca60fd 100644 --- a/src/main/java/com/mindata/blockchain/socket/client/ClientStarter.java +++ b/src/main/java/com/mindata/blockchain/socket/client/ClientStarter.java @@ -1,11 +1,13 @@ package com.mindata.blockchain.socket.client; +import com.google.common.collect.Maps; import com.mindata.blockchain.common.AppId; import com.mindata.blockchain.common.CommonUtil; import com.mindata.blockchain.core.bean.Member; import com.mindata.blockchain.core.bean.MemberData; import com.mindata.blockchain.core.bean.Permission; import com.mindata.blockchain.core.bean.PermissionData; +import com.mindata.blockchain.core.event.NodesConnectedEvent; import com.mindata.blockchain.core.manager.PermissionManager; import com.mindata.blockchain.socket.common.Const; import com.mindata.blockchain.socket.packet.BlockPacket; @@ -13,22 +15,25 @@ import com.mindata.blockchain.socket.packet.NextBlockPacketBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.web.client.RestTemplate; import org.tio.client.AioClient; -import org.tio.client.ClientChannelContext; import org.tio.client.ClientGroupContext; import org.tio.core.Aio; +import org.tio.core.ChannelContext; import org.tio.core.Node; import org.tio.utils.lock.SetWithLock; +import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.concurrent.locks.Lock; +import java.util.stream.Collectors; import static com.mindata.blockchain.socket.common.Const.GROUP_NAME; @@ -51,10 +56,25 @@ public class ClientStarter { private String appId; @Value("${name}") private String name; + @Value("${singleNode:false}") + private Boolean singleNode; private Logger logger = LoggerFactory.getLogger(getClass()); - private static Set nodes = new HashSet<>(); + private Set nodes = new HashSet<>(); + + // 节点连接状态 + private Map nodesStatus = Maps.newConcurrentMap(); + private volatile boolean isNodesReady = false; // 节点是否已准备好 + + /** + * 初始化权限信息 + * 避免新联盟节点加入时,同步区块而权限未初始化导致同步异常 + */ + @PostConstruct + public void initPermission() { + fetchPermission(); + } /** * 从麦达区块链管理端获取已登记的各服务器ip @@ -63,6 +83,7 @@ public class ClientStarter { @Scheduled(fixedRate = 300000) public void fetchOtherServer() { String localIp = CommonUtil.getLocalIp(); + logger.info("本机IP:{}",localIp); try { //如果连不上服务器,就不让启动 MemberData memberData = restTemplate.getForEntity(managerUrl + "member?name=" + name + "&appId=" + AppId @@ -74,11 +95,14 @@ public class ClientStarter { if (memberData.getCode() == 0) { List memberList = memberData.getMembers(); logger.info("共有" + memberList.size() + "个成员需要连接:" + memberList.toString()); + + nodes.clear(); for (Member member : memberList) { Node node = new Node(member.getIp(), Const.PORT); - //开始尝试绑定到对方开启的server - bindServerGroup(node); + nodes.add(node); } + //开始尝试绑定到对方开启的server + bindServerGroup(nodes); } else { logger.error("不是合法有效的已注册的客户端"); @@ -115,9 +139,20 @@ public class ClientStarter { } - @EventListener(ApplicationReadyEvent.class) - public void fetchNextBlock() throws InterruptedException { - Thread.sleep(6000); + /** + * 每30秒群发一次消息,和别人对比最新的Block + */ + @Scheduled(fixedRate = 30000) + public void heartBeat() { + if(!isNodesReady) { + return; + } + logger.info("---------开始心跳包--------"); + BlockPacket blockPacket = NextBlockPacketBuilder.build(); + packetSender.sendGroup(blockPacket); + } + + public void onNodesReady() { logger.info("开始群发信息获取next Block"); //在这里发请求,去获取group别人的新区块 BlockPacket nextBlockPacket = NextBlockPacketBuilder.build(); @@ -125,31 +160,78 @@ public class ClientStarter { } /** - * client在此绑定多个服务器,多个服务器为一个group,将来发消息时发给一个group + * client在此绑定多个服务器,多个服务器为一个group,将来发消息时发给一个group。 + * 此处连接的server的ip需要和服务器端保持一致,服务器删了,这边也要踢出Group */ - private void bindServerGroup(Node serverNode) { + private void bindServerGroup(Set serverNodes) { + //当前已经连接的 + SetWithLock setWithLock = Aio.getAllChannelContexts(clientGroupContext); + Lock lock2 = setWithLock.getLock().readLock(); + lock2.lock(); try { - AioClient aioClient = new AioClient(clientGroupContext); - logger.info("开始绑定" + ":" + serverNode.toString()); - ClientChannelContext clientChannelContext = aioClient.connect(serverNode, 1); - if (clientChannelContext == null) { - logger.info("绑定" + serverNode.toString() + "失败"); - return; + Set set = setWithLock.getObj(); + //已连接的节点集合 + Set connectedNodes = set.stream().map(ChannelContext::getServerNode).collect(Collectors.toSet()); + + //连接新增的,删掉已在管理端不存在的 + for (Node node : serverNodes) { + if (!connectedNodes.contains(node)) { + connect(node); + } } - if (Aio.isInGroup(GROUP_NAME, clientChannelContext)) { - return; + //删掉已经不存在 + for (ChannelContext channelContext : set) { + Node node = channelContext.getServerNode(); + if (!serverNodes.contains(node)) { + Aio.remove(channelContext, "主动关闭" + node.getIp()); + } + } - //绑group是将要连接的各个服务器节点做为一个group - Aio.bindGroup(clientChannelContext, GROUP_NAME); + } finally { + lock2.unlock(); + } + + } + + private void connect(Node serverNode) { + try { + AioClient aioClient = new AioClient(clientGroupContext); + logger.info("开始绑定" + ":" + serverNode.toString()); + aioClient.asynConnect(serverNode); } catch (Exception e) { logger.info("异常"); } + } + + @EventListener(NodesConnectedEvent.class) + public void onConnected(NodesConnectedEvent connectedEvent){ + ChannelContext channelContext = connectedEvent.getSource(); + Node node = channelContext.getServerNode(); + if (channelContext.isClosed()) { + logger.info("连接" + node.toString() + "失败"); + nodesStatus.put(node.getIp(), -1); + return; + }else{ + logger.info("连接" + node.toString() + "成功"); + nodesStatus.put(node.getIp(), 1); + //绑group是将要连接的各个服务器节点做为一个group + Aio.bindGroup(channelContext, GROUP_NAME); + int csize = Aio.getAllChannelContexts(clientGroupContext).size(); + if(csize >= pbftAgreeCount()){ + synchronized (nodesStatus) { + if(!isNodesReady){ + isNodesReady = true; + onNodesReady(); + } + } + } + } } public int halfGroupSize() { - SetWithLock setWithLock = clientGroupContext.groups.clients(clientGroupContext, Const.GROUP_NAME); - return ((Set) setWithLock.getObj()).size() / 2; + SetWithLock setWithLock = clientGroupContext.groups.clients(clientGroupContext, Const.GROUP_NAME); + return setWithLock.getObj().size() / 2; } /** @@ -164,6 +246,10 @@ public class ClientStarter { if (pbft <= 0) { pbft = 1; } + //如果要单节点测试,此处返回值改为0 + if(singleNode) { + return 0; + } return pbft; } diff --git a/src/main/java/com/mindata/blockchain/socket/distruptor/DisruptorServerHandler.java b/src/main/java/com/mindata/blockchain/socket/distruptor/DisruptorServerHandler.java index 3ab2e49cf1b4ffe07f73e925c38807a3fdd1b068..032c81d84af67a3ee42754328db8c1dc48e9a117 100644 --- a/src/main/java/com/mindata/blockchain/socket/distruptor/DisruptorServerHandler.java +++ b/src/main/java/com/mindata/blockchain/socket/distruptor/DisruptorServerHandler.java @@ -1,16 +1,26 @@ package com.mindata.blockchain.socket.distruptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.lmax.disruptor.EventHandler; import com.mindata.blockchain.ApplicationContextProvider; import com.mindata.blockchain.socket.distruptor.base.BaseEvent; +import com.mindata.blockchain.socket.handler.server.PbftVoteHandler; /** * @author wuweifeng wrote on 2018/4/20. */ public class DisruptorServerHandler implements EventHandler { + + private Logger logger = LoggerFactory.getLogger(DisruptorServerHandler.class); @Override public void onEvent(BaseEvent baseEvent, long sequence, boolean endOfBatch) throws Exception { - ApplicationContextProvider.getBean(DisruptorServerConsumer.class).receive(baseEvent); + try { + ApplicationContextProvider.getBean(DisruptorServerConsumer.class).receive(baseEvent); + } catch (Exception e) { + logger.error("Disruptor事件执行异常",e); + } } } diff --git a/src/main/java/com/mindata/blockchain/socket/handler/client/FetchBlockResponseHandler.java b/src/main/java/com/mindata/blockchain/socket/handler/client/FetchBlockResponseHandler.java index 61e10883559996eabfc9883d8ceab2c865c2f9e4..423ab5bca058c09ad2b37ff0d01db3310e9b51cc 100644 --- a/src/main/java/com/mindata/blockchain/socket/handler/client/FetchBlockResponseHandler.java +++ b/src/main/java/com/mindata/blockchain/socket/handler/client/FetchBlockResponseHandler.java @@ -10,6 +10,8 @@ import com.mindata.blockchain.socket.body.RpcCheckBlockBody; import com.mindata.blockchain.socket.client.PacketSender; import com.mindata.blockchain.socket.packet.BlockPacket; import com.mindata.blockchain.socket.packet.NextBlockPacketBuilder; +import com.mindata.blockchain.socket.pbft.queue.NextBlockQueue; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tio.core.ChannelContext; @@ -38,6 +40,8 @@ public class FetchBlockResponseHandler extends AbstractBlockHandler { - try { - Thread.sleep(2000); - Block block = ApplicationContextProvider.getBean(DbBlockManager.class).getBlockByHash(rpcBlockBody - .getHash()); - //本地有了 - if (block == null) { - logger.info("开始去获取别人的新区块"); - //在这里发请求,去获取group别人的新区块 - BlockPacket nextBlockPacket = NextBlockPacketBuilder.build(); - ApplicationContextProvider.getBean(PacketSender.class).sendGroup(nextBlockPacket); - } - } catch (InterruptedException e) { - e.printStackTrace(); + TimerManager.schedule(() -> { + Block block = ApplicationContextProvider.getBean(DbBlockManager.class).getBlockByHash(rpcBlockBody + .getHash()); + //本地有了 + if (block == null) { + logger.info("开始去获取别人的新区块"); + //在这里发请求,去获取group别人的新区块 + BlockPacket nextBlockPacket = NextBlockPacketBuilder.build(); + ApplicationContextProvider.getBean(PacketSender.class).sendGroup(nextBlockPacket); } return null; - }); + },2000); return null; } diff --git a/src/main/java/com/mindata/blockchain/socket/pbft/listener/PrepareEventListener.java b/src/main/java/com/mindata/blockchain/socket/pbft/listener/PrepareEventListener.java index 11da0af4bb0fb393db8aaaa17cc1d8ffb03bb032..59ff448dfeb985cd5a18d5da8a29d2374f75194d 100644 --- a/src/main/java/com/mindata/blockchain/socket/pbft/listener/PrepareEventListener.java +++ b/src/main/java/com/mindata/blockchain/socket/pbft/listener/PrepareEventListener.java @@ -1,18 +1,17 @@ package com.mindata.blockchain.socket.pbft.listener; -import com.mindata.blockchain.common.AppId; +import javax.annotation.Resource; + +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Component; + import com.mindata.blockchain.socket.body.VoteBody; import com.mindata.blockchain.socket.client.PacketSender; import com.mindata.blockchain.socket.packet.BlockPacket; import com.mindata.blockchain.socket.packet.PacketBuilder; import com.mindata.blockchain.socket.packet.PacketType; -import com.mindata.blockchain.socket.pbft.VoteType; import com.mindata.blockchain.socket.pbft.event.MsgPrepareEvent; import com.mindata.blockchain.socket.pbft.msg.VoteMsg; -import org.springframework.context.event.EventListener; -import org.springframework.stereotype.Component; - -import javax.annotation.Resource; /** * @author wuweifeng wrote on 2018/4/25. @@ -31,8 +30,6 @@ public class PrepareEventListener { @EventListener public void msgIsPrepare(MsgPrepareEvent msgPrepareEvent) { VoteMsg voteMsg = (VoteMsg) msgPrepareEvent.getSource(); - voteMsg.setVoteType(VoteType.PREPARE); - voteMsg.setAppId(AppId.value); //群发消息,通知别的节点,我已对该Block Prepare BlockPacket blockPacket = new PacketBuilder<>().setType(PacketType.PBFT_VOTE).setBody(new diff --git a/src/main/java/com/mindata/blockchain/socket/pbft/queue/AbstractVoteMsgQueue.java b/src/main/java/com/mindata/blockchain/socket/pbft/queue/AbstractVoteMsgQueue.java index 365b180f8a9d0848bc1cc13f692be1f21d8f0d04..c3027ac8283306ee4388f4c52b5898464622cd7c 100644 --- a/src/main/java/com/mindata/blockchain/socket/pbft/queue/AbstractVoteMsgQueue.java +++ b/src/main/java/com/mindata/blockchain/socket/pbft/queue/AbstractVoteMsgQueue.java @@ -1,15 +1,17 @@ package com.mindata.blockchain.socket.pbft.queue; -import cn.hutool.core.collection.CollectionUtil; -import com.mindata.blockchain.socket.pbft.msg.VoteMsg; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.ArrayList; import java.util.List; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.mindata.blockchain.common.timer.TimerManager; +import com.mindata.blockchain.socket.pbft.msg.VoteMsg; + +import cn.hutool.core.collection.CollectionUtil; + /** * @author wuweifeng wrote on 2018/4/26. */ @@ -33,18 +35,19 @@ public abstract class AbstractVoteMsgQueue extends BaseMsgQueue { List voteMsgs = voteMsgConcurrentHashMap.get(hash); if (CollectionUtil.isEmpty(voteMsgs)) { voteMsgs = new ArrayList<>(); - voteMsgs.add(voteMsg); voteMsgConcurrentHashMap.put(hash, voteMsgs); - } - //判断本地集合是否已经存在完全相同的voteMsg了 - for (VoteMsg temp : voteMsgs) { - if (temp.getNumber() == voteMsg.getNumber() && temp.getAppId().equals(voteMsg.getAppId())) { - return; + } else { + //如果不空的情况下,判断本地集合是否已经存在完全相同的voteMsg了 + for (VoteMsg temp : voteMsgs) { + if (temp.getAppId().equals(voteMsg.getAppId())) { + return; + } } } + //添加进去 voteMsgs.add(voteMsg); - //如果我已经对该hash的commit投过票了,就不再继续 + //如果已经对该hash投过票了,就不再继续 if (voteStateConcurrentHashMap.get(hash) != null) { return; } @@ -53,21 +56,25 @@ public abstract class AbstractVoteMsgQueue extends BaseMsgQueue { } /** - * 校验队列中是否存在number相同,hash不同,且被同意数量已经超过过2f+1的记录 + * 该方法用来确认待push阶段的下一阶段是否已存在已达成共识的Block

+ * 譬如收到了区块5的Prepare的投票信息,那么我是否接受该投票,需要先校验Commit阶段是否已经存在number>=5的投票成功信息 * * @param hash * hash * @return 是否超过 */ public boolean hasOtherConfirm(String hash, int number) { + //遍历该阶段的所有投票信息 for (String key : voteMsgConcurrentHashMap.keySet()) { + //如果下一阶段存在同一个hash的投票,则不理会 if (hash.equals(key)) { continue; } + //如果下一阶段的number比当前投票的小,则不理会 if (voteMsgConcurrentHashMap.get(key).get(0).getNumber() < number) { continue; } - //如果有别的>=number的Block已经达成共识了,则返回true + //只有别的>=number的Block已经达成共识了,则返回true,那么将会拒绝该hash进入下一阶段 if (voteStateConcurrentHashMap.get(key) != null && voteStateConcurrentHashMap.get(key)) { return true; } @@ -79,12 +86,7 @@ public abstract class AbstractVoteMsgQueue extends BaseMsgQueue { * 清理旧的block的hash */ protected void clearOldBlockHash(int number) { - CompletableFuture.supplyAsync(() -> { - try { - Thread.sleep(2000); - } catch (InterruptedException e) { - e.printStackTrace(); - } + TimerManager.schedule(() -> { for (String key : voteMsgConcurrentHashMap.keySet()) { if (voteMsgConcurrentHashMap.get(key).get(0).getNumber() <= number) { voteMsgConcurrentHashMap.remove(key); @@ -92,6 +94,6 @@ public abstract class AbstractVoteMsgQueue extends BaseMsgQueue { } } return null; - }); + },2000); } } diff --git a/src/main/java/com/mindata/blockchain/socket/pbft/queue/CommitMsgQueue.java b/src/main/java/com/mindata/blockchain/socket/pbft/queue/CommitMsgQueue.java index 11dea949d77851c6dfd9d8e6ebe1f39aedaf0f9b..5e2a9443cd0ca4fe7ff7dbd56c75addc6e1658f5 100644 --- a/src/main/java/com/mindata/blockchain/socket/pbft/queue/CommitMsgQueue.java +++ b/src/main/java/com/mindata/blockchain/socket/pbft/queue/CommitMsgQueue.java @@ -1,17 +1,19 @@ package com.mindata.blockchain.socket.pbft.queue; -import com.mindata.blockchain.ApplicationContextProvider; -import com.mindata.blockchain.block.Block; -import com.mindata.blockchain.core.event.AddBlockEvent; -import com.mindata.blockchain.socket.pbft.msg.VoteMsg; +import java.util.List; + +import javax.annotation.Resource; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.event.EventListener; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; -import javax.annotation.Resource; -import java.util.List; +import com.mindata.blockchain.ApplicationContextProvider; +import com.mindata.blockchain.block.Block; +import com.mindata.blockchain.core.event.AddBlockEvent; +import com.mindata.blockchain.socket.pbft.msg.VoteMsg; /** * Confirm阶段的消息队列 @@ -30,11 +32,6 @@ public class CommitMsgQueue extends AbstractVoteMsgQueue { protected void deal(VoteMsg voteMsg, List voteMsgs) { String hash = voteMsg.getHash(); - //如果已经落地过了 - if (voteStateConcurrentHashMap.get(hash) != null) { - return; - } - //通过校验agree数量,来决定是否在本地生成Block long count = voteMsgs.stream().filter(VoteMsg::isAgree).count(); logger.info("已经commit为true的数量为:"+ count); diff --git a/src/main/java/com/mindata/blockchain/socket/pbft/queue/MsgQueueManager.java b/src/main/java/com/mindata/blockchain/socket/pbft/queue/MsgQueueManager.java index f574ae3cc723604295c984930b0c95d12328f20a..8e9169c712fbdd36aa5068e25cb6b241f061b306 100644 --- a/src/main/java/com/mindata/blockchain/socket/pbft/queue/MsgQueueManager.java +++ b/src/main/java/com/mindata/blockchain/socket/pbft/queue/MsgQueueManager.java @@ -10,9 +10,9 @@ import org.springframework.stereotype.Component; */ @Component public class MsgQueueManager { - private BaseMsgQueue baseMsgQueue; public void pushMsg(VoteMsg voteMsg) { + BaseMsgQueue baseMsgQueue = null; switch (voteMsg.getVoteType()) { case VoteType .PREPREPARE: @@ -27,7 +27,8 @@ public class MsgQueueManager { default: break; } - - baseMsgQueue.push(voteMsg); + if(baseMsgQueue != null) { + baseMsgQueue.push(voteMsg); + } } } diff --git a/src/main/java/com/mindata/blockchain/socket/pbft/queue/NextBlockQueue.java b/src/main/java/com/mindata/blockchain/socket/pbft/queue/NextBlockQueue.java index a87cc3bd8188aac3c9f74d4e3616a39e963fa065..a15a0fbcd46f4aa124090c9696a185570be3e6d6 100644 --- a/src/main/java/com/mindata/blockchain/socket/pbft/queue/NextBlockQueue.java +++ b/src/main/java/com/mindata/blockchain/socket/pbft/queue/NextBlockQueue.java @@ -1,6 +1,8 @@ package com.mindata.blockchain.socket.pbft.queue; import cn.hutool.core.util.StrUtil; + +import com.google.common.collect.Lists; import com.mindata.blockchain.core.manager.DbBlockManager; import com.mindata.blockchain.socket.body.BlockHash; import com.mindata.blockchain.socket.body.RpcSimpleBlockBody; @@ -38,6 +40,18 @@ public class NextBlockQueue { * prevHash->hash,记录上一区块hash和hash的映射 */ private ConcurrentHashMap> requestMap = new ConcurrentHashMap<>(); + + /** + * 保存已经通过的区块hash,用于后面校验落地区块 + */ + private List wantHashs = Lists.newCopyOnWriteArrayList(); + + public String pop(String hash) { + if(wantHashs.remove(hash)) { + return hash; + } + return null; + } public List get(String key) { return requestMap.get(key); @@ -133,6 +147,7 @@ public class NextBlockQueue { //判断数量是否过线 if (maxCount >= agreeCount - 1) { logger.info("共有<" + maxCount + ">个节点返回next block hash为" + wantHash); + wantHashs.add(wantHash); //请求拉取该hash的Block BlockPacket blockPacket = new PacketBuilder().setType(PacketType .FETCH_BLOCK_INFO_REQUEST).setBody(new RpcSimpleBlockBody(wantHash)).build(); diff --git a/src/main/java/com/mindata/blockchain/socket/pbft/queue/PreMsgQueue.java b/src/main/java/com/mindata/blockchain/socket/pbft/queue/PreMsgQueue.java index fcd4da26e017fb0a376c70841c9b991f68075e30..9bcaf34d5069ef4e6a315020c04292ae4e4fea06 100644 --- a/src/main/java/com/mindata/blockchain/socket/pbft/queue/PreMsgQueue.java +++ b/src/main/java/com/mindata/blockchain/socket/pbft/queue/PreMsgQueue.java @@ -1,7 +1,12 @@ package com.mindata.blockchain.socket.pbft.queue; +import cn.hutool.core.bean.BeanUtil; import com.mindata.blockchain.block.Block; +import com.mindata.blockchain.common.AppId; +import com.mindata.blockchain.common.timer.TimerManager; import com.mindata.blockchain.core.event.AddBlockEvent; +import com.mindata.blockchain.core.sqlite.SqliteManager; +import com.mindata.blockchain.socket.pbft.VoteType; import com.mindata.blockchain.socket.pbft.event.MsgPrepareEvent; import com.mindata.blockchain.socket.pbft.msg.VoteMsg; import com.mindata.blockchain.socket.pbft.msg.VotePreMsg; @@ -13,7 +18,6 @@ import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; import javax.annotation.Resource; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; /** @@ -23,6 +27,8 @@ import java.util.concurrent.ConcurrentHashMap; */ @Component public class PreMsgQueue extends BaseMsgQueue { + @Resource + private SqliteManager sqliteManager; @Resource private PrepareMsgQueue prepareMsgQueue; @Resource @@ -47,11 +53,24 @@ public class PreMsgQueue extends BaseMsgQueue { logger.info("拒绝进入Prepare阶段,hash为" + hash); return; } + // 检测脚本是否正常 + try { + sqliteManager.tryExecute(votePreMsg.getBlock()); + } catch (Exception e) { + // 执行异常 + logger.info("sql指令预执行失败"); + return; + } + //存入Pre集合中 blockConcurrentHashMap.put(hash, votePreMsg); //加入Prepare行列,推送给所有人 - eventPublisher.publishEvent(new MsgPrepareEvent(voteMsg)); + VoteMsg prepareMsg = new VoteMsg(); + BeanUtil.copyProperties(voteMsg, prepareMsg); + prepareMsg.setVoteType(VoteType.PREPARE); + prepareMsg.setAppId(AppId.value); + eventPublisher.publishEvent(new MsgPrepareEvent(prepareMsg)); } /** @@ -77,18 +96,13 @@ public class PreMsgQueue extends BaseMsgQueue { public void blockGenerated(AddBlockEvent addBlockEvent) { Block block = (Block) addBlockEvent.getSource(); int number = block.getBlockHeader().getNumber(); - CompletableFuture.supplyAsync(() -> { - try { - Thread.sleep(2000); - } catch (InterruptedException e) { - e.printStackTrace(); - } + TimerManager.schedule(() -> { for (String key : blockConcurrentHashMap.keySet()) { if (blockConcurrentHashMap.get(key).getNumber() <= number) { blockConcurrentHashMap.remove(key); } } return null; - }); + }, 2000); } } diff --git a/src/main/java/com/mindata/blockchain/socket/pbft/queue/PrepareMsgQueue.java b/src/main/java/com/mindata/blockchain/socket/pbft/queue/PrepareMsgQueue.java index 0ba607d13896dda6e6946e7c570193f77884b9f3..5cff848f713a09c09b625d33300e10a1a364314b 100644 --- a/src/main/java/com/mindata/blockchain/socket/pbft/queue/PrepareMsgQueue.java +++ b/src/main/java/com/mindata/blockchain/socket/pbft/queue/PrepareMsgQueue.java @@ -39,11 +39,6 @@ public class PrepareMsgQueue extends AbstractVoteMsgQueue { @Override protected void deal(VoteMsg voteMsg, List voteMsgs) { String hash = voteMsg.getHash(); - //如果我已经对该hash的commit投过票了,就不再继续 - if (voteStateConcurrentHashMap.get(hash) != null) { - return; - } - VoteMsg commitMsg = new VoteMsg(); BeanUtil.copyProperties(voteMsg, commitMsg); commitMsg.setVoteType(VoteType.COMMIT); @@ -71,12 +66,10 @@ public class PrepareMsgQueue extends AbstractVoteMsgQueue { logger.info("Prepare阶段完毕,是否进入commit的标志是:" + flag); //发出拒绝commit的消息 commitMsg.setAgree(flag); - eventPublisher.publishEvent(new MsgCommitEvent(commitMsg)); voteStateConcurrentHashMap.put(commitMsg.getHash(), flag); + eventPublisher.publishEvent(new MsgCommitEvent(commitMsg)); } - - /** * 判断大家是否已对其他的Block达成共识,如果true,则拒绝即将进入队列的Block * diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index c3b8cd1f7246e5411a08696b9072030d2062db5d..a08dfc5b8fd7838b53c7100bcc560c7c0aaaaf88 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -8,7 +8,11 @@ sqlite: db: levelDB: false #在部分Windows机器上rocksDB会报错dll找不到,那么就用levelDB来替代 rocksDB: true +singleNode: true #如果是单节点测试,改成true version: 1 name: ${NAME:maida} appId: ${APP_ID:wolf} -managerUrl: ${MANAGER_URL:http://localhost:8888/} \ No newline at end of file +managerUrl: ${MANAGER_URL:http://localhost:8888/} +publicKey: A8WLqHTjcT/FQ2IWhIePNShUEcdCzu5dG+XrQU8OMu54 +privateKey: yScdp6fNgUU+cRUTygvJG4EBhDKmOMRrK4XJ9mKVQJ8= +singeNode: true #单节点测试模式 \ No newline at end of file diff --git a/src/main/resources/logback-spring.xml b/src/main/resources/logback-spring.xml index 766ceeb8b9fec4dcf686fa9b5b7e1e2c31f7605d..8d2894044e48472d7ba973fee36d4e8e37ee1243 100644 --- a/src/main/resources/logback-spring.xml +++ b/src/main/resources/logback-spring.xml @@ -1,8 +1,17 @@ - + + + + + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n + + + + true @@ -22,7 +31,7 @@ - + \ No newline at end of file diff --git a/src/test/java/com/mindata/blockchain/MdBlockchainApplicationTests.java b/src/test/java/com/mindata/blockchain/MdBlockchainApplicationTests.java index c6bdc3fdb1440f1590cb6f52b38047224f3b0ac4..996a379cf3ba99172276fef8efc0fe0fd1b8dad8 100644 --- a/src/test/java/com/mindata/blockchain/MdBlockchainApplicationTests.java +++ b/src/test/java/com/mindata/blockchain/MdBlockchainApplicationTests.java @@ -11,6 +11,7 @@ public class MdBlockchainApplicationTests { @Test public void contextLoads() { + //TODO 需要测试高并发下生成block的速度和冲突情况 } }