diff --git a/src/main/java/com/mindata/blockchain/core/event/NodesConnectedEvent.java b/src/main/java/com/mindata/blockchain/core/event/NodesConnectedEvent.java
new file mode 100644
index 0000000000000000000000000000000000000000..d2690136c3c68a6f50b5e5f77994d83ce8479c66
--- /dev/null
+++ b/src/main/java/com/mindata/blockchain/core/event/NodesConnectedEvent.java
@@ -0,0 +1,21 @@
+package com.mindata.blockchain.core.event;
+
+import org.springframework.context.ApplicationEvent;
+import org.tio.core.ChannelContext;
+
+/**
+ * 节点连接完成时会触发该Event
+ * @author andylo25 wrote on 2018/6/15.
+ */
+public class NodesConnectedEvent extends ApplicationEvent {
+ private static final long serialVersionUID = 526755692642414178L;
+
+ public NodesConnectedEvent(ChannelContext channelContext) {
+ super(channelContext);
+ }
+
+ public ChannelContext getSource() {
+ return (ChannelContext) source;
+ }
+
+}
diff --git a/src/main/java/com/mindata/blockchain/core/repository/MessageRepository.java b/src/main/java/com/mindata/blockchain/core/repository/MessageRepository.java
index b81ba06c27b0d6e550a1e71260f025cf689bc3c3..097c7a14d6bf0cf4c789c28d1884ae2b76ba63e0 100644
--- a/src/main/java/com/mindata/blockchain/core/repository/MessageRepository.java
+++ b/src/main/java/com/mindata/blockchain/core/repository/MessageRepository.java
@@ -1,7 +1,5 @@
package com.mindata.blockchain.core.repository;
-import org.springframework.transaction.annotation.Transactional;
-
import com.mindata.blockchain.core.model.MessageEntity;
/**
@@ -12,7 +10,6 @@ public interface MessageRepository extends BaseRepository {
* 删除一条记录
* @param messageId messageId
*/
- @Transactional
void deleteByMessageId(String messageId);
/**
diff --git a/src/main/java/com/mindata/blockchain/core/sqlite/SqliteManager.java b/src/main/java/com/mindata/blockchain/core/sqlite/SqliteManager.java
index ca694973877f83b6d3d26ef61f8efb9566e03946..af1d7901ddbe43799d18934125b7dd65f6eb0f30 100644
--- a/src/main/java/com/mindata/blockchain/core/sqlite/SqliteManager.java
+++ b/src/main/java/com/mindata/blockchain/core/sqlite/SqliteManager.java
@@ -70,11 +70,13 @@ public class SqliteManager {
/**
* 根据一个block执行sql
- *
+ * 整个block一个事务
+ *
* @param block
* block
*/
- private void execute(Block block) {
+ @Transactional
+ public void execute(Block block) {
List instructions = block.getBlockBody().getInstructions();
//InstructionParserImpl类里面执行的是InstructionBase,需要转成InstructionBase
for (Instruction instruction : instructions) {
diff --git a/src/main/java/com/mindata/blockchain/socket/client/BlockClientAioListener.java b/src/main/java/com/mindata/blockchain/socket/client/BlockClientAioListener.java
index b73f46d9229bacedcb8d2095c657c042445275a5..33d474244555809be74749495ccc1526c3eee6d8 100644
--- a/src/main/java/com/mindata/blockchain/socket/client/BlockClientAioListener.java
+++ b/src/main/java/com/mindata/blockchain/socket/client/BlockClientAioListener.java
@@ -7,7 +7,8 @@ import org.tio.core.Aio;
import org.tio.core.ChannelContext;
import org.tio.core.intf.Packet;
-import static com.mindata.blockchain.socket.common.Const.GROUP_NAME;
+import com.mindata.blockchain.ApplicationContextProvider;
+import com.mindata.blockchain.core.event.NodesConnectedEvent;
/**
* client端对各个server连接的情况回调。
@@ -20,12 +21,13 @@ public class BlockClientAioListener implements ClientAioListener {
@Override
public void onAfterConnected(ChannelContext channelContext, boolean isConnected, boolean isReconnect) throws Exception {
- if (isConnected) {
- logger.info("连接成功:server地址为-" + channelContext.getServerNode());
- Aio.bindGroup(channelContext, GROUP_NAME);
- } else {
- logger.info("连接失败:server地址为-" + channelContext.getServerNode());
- }
+// if (isConnected) {
+// logger.info("连接成功:server地址为-" + channelContext.getServerNode());
+// Aio.bindGroup(channelContext, Const.GROUP_NAME);
+// } else {
+// logger.info("连接失败:server地址为-" + channelContext.getServerNode());
+// }
+ ApplicationContextProvider.publishEvent(new NodesConnectedEvent(channelContext));
}
@Override
diff --git a/src/main/java/com/mindata/blockchain/socket/client/ClientStarter.java b/src/main/java/com/mindata/blockchain/socket/client/ClientStarter.java
index 43acf292463597154da5c8e86c71e38796a9f1e3..404382116ee72b9a0bd9e14cb7535e11dedca3e6 100644
--- a/src/main/java/com/mindata/blockchain/socket/client/ClientStarter.java
+++ b/src/main/java/com/mindata/blockchain/socket/client/ClientStarter.java
@@ -1,19 +1,19 @@
package com.mindata.blockchain.socket.client;
-import com.mindata.blockchain.common.AppId;
-import com.mindata.blockchain.common.CommonUtil;
-import com.mindata.blockchain.core.bean.Member;
-import com.mindata.blockchain.core.bean.MemberData;
-import com.mindata.blockchain.core.bean.Permission;
-import com.mindata.blockchain.core.bean.PermissionData;
-import com.mindata.blockchain.core.manager.PermissionManager;
-import com.mindata.blockchain.socket.common.Const;
-import com.mindata.blockchain.socket.packet.BlockPacket;
-import com.mindata.blockchain.socket.packet.NextBlockPacketBuilder;
+import static com.mindata.blockchain.socket.common.Const.GROUP_NAME;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
+import java.util.stream.Collectors;
+
+import javax.annotation.Resource;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
-import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@@ -26,14 +26,18 @@ import org.tio.core.ChannelContext;
import org.tio.core.Node;
import org.tio.utils.lock.SetWithLock;
-import javax.annotation.Resource;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.locks.Lock;
-import java.util.stream.Collectors;
-
-import static com.mindata.blockchain.socket.common.Const.GROUP_NAME;
+import com.google.common.collect.Maps;
+import com.mindata.blockchain.common.AppId;
+import com.mindata.blockchain.common.CommonUtil;
+import com.mindata.blockchain.core.bean.Member;
+import com.mindata.blockchain.core.bean.MemberData;
+import com.mindata.blockchain.core.bean.Permission;
+import com.mindata.blockchain.core.bean.PermissionData;
+import com.mindata.blockchain.core.event.NodesConnectedEvent;
+import com.mindata.blockchain.core.manager.PermissionManager;
+import com.mindata.blockchain.socket.common.Const;
+import com.mindata.blockchain.socket.packet.BlockPacket;
+import com.mindata.blockchain.socket.packet.NextBlockPacketBuilder;
/**
* @author wuweifeng wrote on 2018/3/18.
@@ -59,7 +63,11 @@ public class ClientStarter {
private Logger logger = LoggerFactory.getLogger(getClass());
- private static Set nodes = new HashSet<>();
+ private Set nodes = new HashSet<>();
+
+ // 节点连接状态
+ private Map nodesStatus = Maps.newConcurrentMap();
+ private volatile boolean isNodesReady = false; // 节点是否已准备好
/**
* 从麦达区块链管理端获取已登记的各服务器ip
@@ -129,14 +137,13 @@ public class ClientStarter {
*/
@Scheduled(fixedRate = 30000)
public void heartBeat() {
+ if(!isNodesReady)return;
logger.info("---------开始心跳包--------");
BlockPacket blockPacket = NextBlockPacketBuilder.build();
packetSender.sendGroup(blockPacket);
}
- @EventListener(ApplicationReadyEvent.class)
- public void fetchNextBlock() throws InterruptedException {
- Thread.sleep(6000);
+ public void onNodesReady() {
logger.info("开始群发信息获取next Block");
//在这里发请求,去获取group别人的新区块
BlockPacket nextBlockPacket = NextBlockPacketBuilder.build();
@@ -181,21 +188,41 @@ public class ClientStarter {
try {
AioClient aioClient = new AioClient(clientGroupContext);
logger.info("开始绑定" + ":" + serverNode.toString());
- ClientChannelContext clientChannelContext = aioClient.connect(serverNode, 2);
- if (clientChannelContext == null) {
- logger.info("绑定" + serverNode.toString() + "失败");
- return;
- }
- //绑group是将要连接的各个服务器节点做为一个group
- Aio.bindGroup(clientChannelContext, GROUP_NAME);
+ aioClient.asynConnect(serverNode);
} catch (Exception e) {
logger.info("异常");
}
}
+
+ @EventListener(NodesConnectedEvent.class)
+ public void onConnected(NodesConnectedEvent connectedEvent){
+ ChannelContext channelContext = connectedEvent.getSource();
+ Node node = channelContext.getServerNode();
+ if (channelContext.isClosed()) {
+ logger.info("连接" + node.toString() + "失败");
+ nodesStatus.put(node.getIp(), -1);
+ return;
+ }else{
+ logger.info("连接" + node.toString() + "成功");
+ nodesStatus.put(node.getIp(), 1);
+ //绑group是将要连接的各个服务器节点做为一个group
+ Aio.bindGroup(channelContext, GROUP_NAME);
+
+ int csize = Aio.getAllChannelContexts(clientGroupContext).size();
+ if(csize >= pbftAgreeCount()){
+ synchronized (nodesStatus) {
+ if(!isNodesReady){
+ isNodesReady = true;
+ onNodesReady();
+ }
+ }
+ }
+ }
+ }
public int halfGroupSize() {
- SetWithLock setWithLock = clientGroupContext.groups.clients(clientGroupContext, Const.GROUP_NAME);
- return ((Set) setWithLock.getObj()).size() / 2;
+ SetWithLock setWithLock = clientGroupContext.groups.clients(clientGroupContext, Const.GROUP_NAME);
+ return setWithLock.getObj().size() / 2;
}
/**