同步操作将从 zkpursuit/kaka-core 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
模块为全局事件驱动框架,无任何第三方依赖;支持同步或者异步获取事件处理结果;可解耦业务,简化程序复杂性,提高代码可读性,降低开发维护成本。
基于观察者和命令模式,
<dependency>
<groupId>io.github.zkpursuit</groupId>
<artifactId>kaka-core</artifactId>
<version>5.6</version>
</dependency>
通过Startup.scan方法扫描指定包下的Command、Proxy、Mediator子类并将其注册到Facade中,Command、Proxy、Mediator亦可直接使用Facade对应的方法手动注册;由Facade处理事件流向。
Command只能监听注册到Facade中的事件,可多个事件注册同一个Command(也可理解为一个Command可监听多个事件),而Mediator则是监听多个自身感兴趣的事件,具体对哪些事件感兴趣则由listMessageInterests方法的返回值决定(总结:一个Command可以对应多个事件;一个事件可以对应多个Mediator,一个Mediator可以对应多个事件;一个事件可以同时对应多个Command和多个Mediator;Command为动态创建,但可池化,Mediator为全局唯一);Command、Mediator是功能非常相似的事件监听器和事件派发器。
@Handler(cmd="A", type=MyEnum.class)
其中"A"为MyEnum中的枚举项
基于此模型构建的斗地主开放源代码 https://gitee.com/zkpursuit/fight-against-landlords ,游戏体验地址 http://101.34.22.36:8080/ , 癞子玩法不支持机器人,需要开三个标签页,并需在匹配时间段(5秒)内同时进入游戏。
import com.kaka.Startup;
import com.kaka.notice.*;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* 异步使用范例
*
* @author zkpursuit
*/
public class Test extends Startup {
public static void main(String[] args) {
Facade facade = FacadeFactory.getFacade();
Test test = new Test();
test.scan("com.test.units"); //扫描类包注册事件
facade.initThreadPool(Executors.newFixedThreadPool(2)); //全局仅设置一次
//同步发送事件通知
facade.sendMessage(new Message("1000", "让MyCommand接收执行"));
//简单的异步发送事件通知
facade.sendMessage(new Message("2000", "让MyMediator和MyCommand接收执行"), true);
/*
1、以下为测试发送事件通知后获得事件处理器的处理结果。
2、一般情况我们不一定需要此功能,为了尽可能的减少对象创建,故而
在需要使用此功能时手动创建AsynResult或者SyncResult对象。
3、我们应该尽可能的使用事件模式代替,比如事件处理器处理完成后再次
调用sendMessage向外派发事件,分散到其它事件处理器中处理,而不是等待处
理结果。
4、异步future模式获取事件处理结果其本质是利用wait、notify(notifyAll)
实现,而使用事件模式则无需调用wait让线程中断等待。
*/
//获取异步处理结果
Message asynMsg = new Message("10000", "让ResultCommand接收执行");
//由于事件通知为广播模式,故而必须为执行结果进行命名标识唯一性
IResult<String> result0 = asynMsg.setResult("ResultMsg", new AsynResult<>(12000));
facade.sendMessage(asynMsg, true); //异步发送事件通知
System.out.println(result0.get());
//获取同步执行结果
Message syncMsg = new Message("20000", "让ResultCommand接收执行");
//由于事件通知为广播模式,故而必须为执行结果进行命名标识唯一性
IResult<String> result1 = syncMsg.setResult("ResultMsg", new SyncResult<>());
facade.sendMessage(syncMsg, false); //同步发送事件通知
System.out.println(result1.get());
//另一种异步处理方式,同步派发事件,事件处理器中使用FutureTask及线程异步获取执行结果
Message syncMsg1 = new Message("30000", "让FutureCommand接收执行");
IResult<String> result2 = syncMsg1.setResult("ResultMsg", new SyncResult<>());
facade.sendMessage(syncMsg1, false); //同步发送事件通知
System.out.println(result2.get());
//哈哈,异步中的异步,其实没必要
Message syncMsg2 = new Message("30000", "让FutureCommand接收执行");
IResult<String> result3 = syncMsg2.setResult("ResultMsg", new AsynResult<>());
facade.sendMessage(syncMsg2, true); //异步发送事件通知
System.out.println(result3.get());
//基于事件模拟切面编程,仅支持Command
facade.sendMessage(new Message("40000"), true);
//异步回调获取事件执行结果
facade.sendMessage(new Message("50000", "", (IResult<Object> result) -> {
String clasz = ((CallbackResult<Object>) result).eventHanderClass;
StringBuilder sb = new StringBuilder("异步回调:\t" + clasz + "\t");
Object resultObj = result.get();
if (resultObj instanceof Object[]) {
Object[] ps = (Object[]) resultObj;
sb.append(Arrays.toString(ps));
} else {
sb.append(resultObj);
}
System.out.println(sb);
}), true);
facade.initScheduleThreadPool(Executors.newScheduledThreadPool(2));
long c = System.currentTimeMillis();
Scheduler scheduler = Scheduler.create("com/test/units")
.startTime(c + 3000) //3秒后开始执行
.endTime(c + 7000) //调度执行结束时间点
.interval(2000, TimeUnit.MILLISECONDS) //执行间隔
.repeat(5); //执行次数
//此处的执行次数为5次,但因执行到某次时超出设置的结束时间,故而实际次数将少于5次
facade.sendMessage(new Message("1000", "让MyCommand接收执行"), scheduler);
}
}
import com.kaka.Startup;
import com.kaka.notice.*;
import java.util.Arrays;
import java.util.concurrent.Executors;
/**
* 本类中使用的activeMQ或RecketMQ均为最新版本
*
* @author zkpursuit
*/
public class Remote_Test extends Startup {
public static void main(String[] args) throws Exception {
Facade facade = FacadeFactory.getFacade();
Remote_Test test = new Remote_Test();
test.scan("kaka.test.unit");
facade.initThreadPool(Executors.newFixedThreadPool(2));
//以下通过ActiveMQ消息队列消费处理事件,并获得事件处理结果
facade.initRemoteMessagePostman(new ActiveMQ("event_exec_before", "event_exec_after")); //此行全局一次设定
//facade.initRemoteMessagePostman(new RocketMQ("event_exec_before", "event_exec_after"));
Message message = new Message("20000", "让ResultCommand接收执行");
IResult<String> result4 = message.setResult("ResultMsg", new AsynLatchResult<>()); //AsynLatchResult可用AsynResult替代
facade.sendRemoteMessage(message);
// try {
// System.out.println("消息队列消费处理事件结果:" + ((AsynLatchResult) result4).get(5, TimeUnit.SECONDS));
// } catch (TimeoutException ex) {
// System.out.println("获取结果超时");
// }
System.out.println("消息队列消费处理事件结果:" + result4.get()); //一直等待结果
facade.sendRemoteMessage(new Message("40000", "", (IResult<Object> result) -> {
String clasz = ((CallbackResult<Object>) result).eventHandlerClass;
StringBuilder sb = new StringBuilder("消息队列消费处理事件结果异步回调:\t" + clasz + "\t");
Object resultObj = result.get();
if (resultObj instanceof Object[]) {
Object[] ps = (Object[]) resultObj;
sb.append(Arrays.toString(ps));
} else {
sb.append(resultObj);
}
System.out.println(sb);
}));
}
}
package com.test.units;
import com.kaka.notice.Command;
import com.kaka.notice.IResult;
import com.kaka.notice.Message;
import com.kaka.notice.annotation.Handler;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
*
* @author zkpursuit
*/
@Handler(cmd = "30000", type = String.class)
public class FutureCommand extends Command {
@Override
public void execute(Message msg) {
FutureTask<String> ft = new FutureTask<>(() -> {
Thread.sleep(3000); //模拟耗时操作
return ">>>>>>>>异步执行结果";
});
new Thread(ft).start();
try {
IResult result = msg.getResult("ResultMsg");
if (result != null) {
result.set(ft.get());
}
} catch (InterruptedException | ExecutionException ex) {
Logger.getLogger(FutureCommand.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
package com.test.units;
import com.kaka.notice.Command;
import com.kaka.notice.Message;
import com.kaka.notice.annotation.Handler;
/**
*
* @author zkpursuit
*/
@Handler(cmd = "1000", type = String.class)
@Handler(cmd = "2000", type = String.class)
public class MyCommand extends Command {
@Override
public void execute(Message msg) {
System.out.println(MyCommand.class.getTypeName() + " -> execute " + msg.getWhat() + " 绑定的数据:" + msg.getBody());
//MyProxy proxy = this.getProxy(MyProxy.class);
//proxy.func();
//this.sendMessage(new Message("3000", "让MyMediator接收执行"));
}
}
package com.test.units;
import com.kaka.notice.AsynResult;
import com.kaka.notice.Command;
import com.kaka.notice.IResult;
import com.kaka.notice.Message;
import com.kaka.notice.SyncResult;
import com.kaka.notice.annotation.Handler;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
*
* @author zkpursuit
*/
@Handler(cmd = "10000", type = String.class)
@Handler(cmd = "20000", type = String.class)
public class ResultCommand extends Command {
@Override
public void execute(Message msg) {
try {
//模拟耗时操作
Thread.sleep(10000);
} catch (InterruptedException ex) {
Logger.getLogger(ResultCommand.class.getName()).log(Level.SEVERE, null, ex);
}
IResult result = msg.getResult("ResultMsg");
if (result != null) {
//必须设置处理结果
if (result instanceof AsynResult) {
result.set(">>>>>>>>异步执行结果");
} else if (result instanceof SyncResult) {
result.set(">>>>>>>>同步执行结果");
}
}
}
}
package com.test.units;
import com.kaka.notice.Mediator;
import com.kaka.notice.Message;
import com.kaka.notice.annotation.MultiHandler;
/**
*
* @author zkpursuit
*/
@MultiHandler
public class MyMediator extends Mediator {
/**
* 处理感兴趣的事件
*
* @param msg 事件通知
*/
@Override
public void handleMessage(Message msg) {
Object what = msg.getWhat();
String cmd = String.valueOf(what);
switch (cmd) {
case "2000":
System.out.println(MyMediator.class.getTypeName() + " -> handleMessage " + msg.getWhat() + " 绑定的数据:" + msg.getBody());
break;
case "3000":
System.out.println(MyMediator.class.getTypeName() + " -> handleMessage " + msg.getWhat() + " 绑定的数据:" + msg.getBody());
break;
}
}
/**
* 申明感兴趣的事件
*
* @return 感兴趣的事件
*/
@Override
public Object[] listMessageInterests() {
return new Object[]{"2000", "3000"};
}
}
package com.test.units;
import com.kaka.notice.Proxy;
import com.kaka.notice.annotation.Model;
/**
*
* @author zkpursuit
*/
@Model
public class MyProxy extends Proxy {
public void func() {
System.out.println("调用了:" + MyProxy.class.getTypeName() + " -> func方法");
}
}
package com.test.unit;
import com.kaka.notice.Command;
import com.kaka.notice.Message;
import com.kaka.notice.annotation.Handler;
@Handler(cmd = "50000", type = String.class, priority = 1)
public class CallbackCommand1 extends Command {
@Override
public void execute(Message msg) {
this.returnCallbackResult(new Object[]{100, "我爱我家"});
}
}
package com.test.unit;
import com.kaka.notice.Command;
import com.kaka.notice.IResult;
import com.kaka.notice.Message;
import com.kaka.notice.annotation.Handler;
/**
* 模拟切面,执行后
*/
@Handler(cmd = "40000", type = String.class, priority = 3)
public class SimulateAopAfterCommand extends Command {
@Override
public void execute(Message msg) {
IResult<Long> execStartTime = msg.getResult("execStartTime");
long offset = System.currentTimeMillis() - execStartTime.get();
System.out.println("Aop业务执行耗时:" + offset);
}
}
package com.test.unit;
import com.kaka.notice.Command;
import com.kaka.notice.IResult;
import com.kaka.notice.Message;
import com.kaka.notice.SyncResult;
import com.kaka.notice.annotation.Handler;
/**
* 模拟切面,执行前
*/
@Handler(cmd = "40000", type = String.class, priority = 1)
public class SimulateAopBeforeCommand extends Command {
@Override
public void execute(Message msg) {
IResult<Long> execStartTime = new SyncResult<>(); //中间变量亦可使用 ThreadLocal 存储
execStartTime.set(System.currentTimeMillis());
msg.setResult("execStartTime", execStartTime);
}
}
package com.test.unit;
import com.kaka.notice.Command;
import com.kaka.notice.Message;
import com.kaka.notice.annotation.Handler;
/**
* 模拟切面
*/
@Handler(cmd = "40000", type = String.class, priority = 2)
public class SimulateAopCommand extends Command {
@Override
public void execute(Message msg) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Aop业务执行");
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。