diff --git a/src/main/java/com/mindata/blockchain/core/event/NodesConnectedEvent.java b/src/main/java/com/mindata/blockchain/core/event/NodesConnectedEvent.java new file mode 100644 index 0000000000000000000000000000000000000000..d2690136c3c68a6f50b5e5f77994d83ce8479c66 --- /dev/null +++ b/src/main/java/com/mindata/blockchain/core/event/NodesConnectedEvent.java @@ -0,0 +1,21 @@ +package com.mindata.blockchain.core.event; + +import org.springframework.context.ApplicationEvent; +import org.tio.core.ChannelContext; + +/** + * 节点连接完成时会触发该Event + * @author andylo25 wrote on 2018/6/15. + */ +public class NodesConnectedEvent extends ApplicationEvent { + private static final long serialVersionUID = 526755692642414178L; + + public NodesConnectedEvent(ChannelContext channelContext) { + super(channelContext); + } + + public ChannelContext getSource() { + return (ChannelContext) source; + } + +} diff --git a/src/main/java/com/mindata/blockchain/core/repository/MessageRepository.java b/src/main/java/com/mindata/blockchain/core/repository/MessageRepository.java index b81ba06c27b0d6e550a1e71260f025cf689bc3c3..097c7a14d6bf0cf4c789c28d1884ae2b76ba63e0 100644 --- a/src/main/java/com/mindata/blockchain/core/repository/MessageRepository.java +++ b/src/main/java/com/mindata/blockchain/core/repository/MessageRepository.java @@ -1,7 +1,5 @@ package com.mindata.blockchain.core.repository; -import org.springframework.transaction.annotation.Transactional; - import com.mindata.blockchain.core.model.MessageEntity; /** @@ -12,7 +10,6 @@ public interface MessageRepository extends BaseRepository { * 删除一条记录 * @param messageId messageId */ - @Transactional void deleteByMessageId(String messageId); /** diff --git a/src/main/java/com/mindata/blockchain/core/sqlite/SqliteManager.java b/src/main/java/com/mindata/blockchain/core/sqlite/SqliteManager.java index ca694973877f83b6d3d26ef61f8efb9566e03946..af1d7901ddbe43799d18934125b7dd65f6eb0f30 100644 --- a/src/main/java/com/mindata/blockchain/core/sqlite/SqliteManager.java +++ b/src/main/java/com/mindata/blockchain/core/sqlite/SqliteManager.java @@ -70,11 +70,13 @@ public class SqliteManager { /** * 根据一个block执行sql - * + * 整个block一个事务 + * * @param block * block */ - private void execute(Block block) { + @Transactional + public void execute(Block block) { List instructions = block.getBlockBody().getInstructions(); //InstructionParserImpl类里面执行的是InstructionBase,需要转成InstructionBase for (Instruction instruction : instructions) { diff --git a/src/main/java/com/mindata/blockchain/socket/client/BlockClientAioListener.java b/src/main/java/com/mindata/blockchain/socket/client/BlockClientAioListener.java index b73f46d9229bacedcb8d2095c657c042445275a5..33d474244555809be74749495ccc1526c3eee6d8 100644 --- a/src/main/java/com/mindata/blockchain/socket/client/BlockClientAioListener.java +++ b/src/main/java/com/mindata/blockchain/socket/client/BlockClientAioListener.java @@ -7,7 +7,8 @@ import org.tio.core.Aio; import org.tio.core.ChannelContext; import org.tio.core.intf.Packet; -import static com.mindata.blockchain.socket.common.Const.GROUP_NAME; +import com.mindata.blockchain.ApplicationContextProvider; +import com.mindata.blockchain.core.event.NodesConnectedEvent; /** * client端对各个server连接的情况回调。

@@ -20,12 +21,13 @@ public class BlockClientAioListener implements ClientAioListener { @Override public void onAfterConnected(ChannelContext channelContext, boolean isConnected, boolean isReconnect) throws Exception { - if (isConnected) { - logger.info("连接成功:server地址为-" + channelContext.getServerNode()); - Aio.bindGroup(channelContext, GROUP_NAME); - } else { - logger.info("连接失败:server地址为-" + channelContext.getServerNode()); - } +// if (isConnected) { +// logger.info("连接成功:server地址为-" + channelContext.getServerNode()); +// Aio.bindGroup(channelContext, Const.GROUP_NAME); +// } else { +// logger.info("连接失败:server地址为-" + channelContext.getServerNode()); +// } + ApplicationContextProvider.publishEvent(new NodesConnectedEvent(channelContext)); } @Override diff --git a/src/main/java/com/mindata/blockchain/socket/client/ClientStarter.java b/src/main/java/com/mindata/blockchain/socket/client/ClientStarter.java index 43acf292463597154da5c8e86c71e38796a9f1e3..404382116ee72b9a0bd9e14cb7535e11dedca3e6 100644 --- a/src/main/java/com/mindata/blockchain/socket/client/ClientStarter.java +++ b/src/main/java/com/mindata/blockchain/socket/client/ClientStarter.java @@ -1,19 +1,19 @@ package com.mindata.blockchain.socket.client; -import com.mindata.blockchain.common.AppId; -import com.mindata.blockchain.common.CommonUtil; -import com.mindata.blockchain.core.bean.Member; -import com.mindata.blockchain.core.bean.MemberData; -import com.mindata.blockchain.core.bean.Permission; -import com.mindata.blockchain.core.bean.PermissionData; -import com.mindata.blockchain.core.manager.PermissionManager; -import com.mindata.blockchain.socket.common.Const; -import com.mindata.blockchain.socket.packet.BlockPacket; -import com.mindata.blockchain.socket.packet.NextBlockPacketBuilder; +import static com.mindata.blockchain.socket.common.Const.GROUP_NAME; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.locks.Lock; +import java.util.stream.Collectors; + +import javax.annotation.Resource; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @@ -26,14 +26,18 @@ import org.tio.core.ChannelContext; import org.tio.core.Node; import org.tio.utils.lock.SetWithLock; -import javax.annotation.Resource; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.locks.Lock; -import java.util.stream.Collectors; - -import static com.mindata.blockchain.socket.common.Const.GROUP_NAME; +import com.google.common.collect.Maps; +import com.mindata.blockchain.common.AppId; +import com.mindata.blockchain.common.CommonUtil; +import com.mindata.blockchain.core.bean.Member; +import com.mindata.blockchain.core.bean.MemberData; +import com.mindata.blockchain.core.bean.Permission; +import com.mindata.blockchain.core.bean.PermissionData; +import com.mindata.blockchain.core.event.NodesConnectedEvent; +import com.mindata.blockchain.core.manager.PermissionManager; +import com.mindata.blockchain.socket.common.Const; +import com.mindata.blockchain.socket.packet.BlockPacket; +import com.mindata.blockchain.socket.packet.NextBlockPacketBuilder; /** * @author wuweifeng wrote on 2018/3/18. @@ -59,7 +63,11 @@ public class ClientStarter { private Logger logger = LoggerFactory.getLogger(getClass()); - private static Set nodes = new HashSet<>(); + private Set nodes = new HashSet<>(); + + // 节点连接状态 + private Map nodesStatus = Maps.newConcurrentMap(); + private volatile boolean isNodesReady = false; // 节点是否已准备好 /** * 从麦达区块链管理端获取已登记的各服务器ip @@ -129,14 +137,13 @@ public class ClientStarter { */ @Scheduled(fixedRate = 30000) public void heartBeat() { + if(!isNodesReady)return; logger.info("---------开始心跳包--------"); BlockPacket blockPacket = NextBlockPacketBuilder.build(); packetSender.sendGroup(blockPacket); } - @EventListener(ApplicationReadyEvent.class) - public void fetchNextBlock() throws InterruptedException { - Thread.sleep(6000); + public void onNodesReady() { logger.info("开始群发信息获取next Block"); //在这里发请求,去获取group别人的新区块 BlockPacket nextBlockPacket = NextBlockPacketBuilder.build(); @@ -181,21 +188,41 @@ public class ClientStarter { try { AioClient aioClient = new AioClient(clientGroupContext); logger.info("开始绑定" + ":" + serverNode.toString()); - ClientChannelContext clientChannelContext = aioClient.connect(serverNode, 2); - if (clientChannelContext == null) { - logger.info("绑定" + serverNode.toString() + "失败"); - return; - } - //绑group是将要连接的各个服务器节点做为一个group - Aio.bindGroup(clientChannelContext, GROUP_NAME); + aioClient.asynConnect(serverNode); } catch (Exception e) { logger.info("异常"); } } + + @EventListener(NodesConnectedEvent.class) + public void onConnected(NodesConnectedEvent connectedEvent){ + ChannelContext channelContext = connectedEvent.getSource(); + Node node = channelContext.getServerNode(); + if (channelContext.isClosed()) { + logger.info("连接" + node.toString() + "失败"); + nodesStatus.put(node.getIp(), -1); + return; + }else{ + logger.info("连接" + node.toString() + "成功"); + nodesStatus.put(node.getIp(), 1); + //绑group是将要连接的各个服务器节点做为一个group + Aio.bindGroup(channelContext, GROUP_NAME); + + int csize = Aio.getAllChannelContexts(clientGroupContext).size(); + if(csize >= pbftAgreeCount()){ + synchronized (nodesStatus) { + if(!isNodesReady){ + isNodesReady = true; + onNodesReady(); + } + } + } + } + } public int halfGroupSize() { - SetWithLock setWithLock = clientGroupContext.groups.clients(clientGroupContext, Const.GROUP_NAME); - return ((Set) setWithLock.getObj()).size() / 2; + SetWithLock setWithLock = clientGroupContext.groups.clients(clientGroupContext, Const.GROUP_NAME); + return setWithLock.getObj().size() / 2; } /**