4 Star 66 Fork 15

hubert-樂xx / tiny

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
README.md 26.23 KB
一键复制 编辑 原始数据 按行查看 历史
xxb 提交于 2024-03-09 12:12 . release 1.4.3

介绍

小巧的java应用微内核框架. 基于 enet 事件环型框架结构

目前大部分高级语言都解决了编程内存自动管理的问题(垃圾回收), 但并没有解决cpu资源自动管理的问题。 基本上都是程序员主动创建线程或线程池,这些线程是否被充分利用了,在应用不同的地方创建是否有必要, 太多的线程是否造成竞争性能损耗。毕竟线程再多,而决定并发的是cpu的个数。

所以需要框架来实现一个智能执行器(线程池):根据应用的忙碌程度自动创建和销毁线程, 即线程池会自己根据排对的任务数和当前池中的线程数的比例判断是否需要新创建线程(和默认线程池的行为不同)。 会catch所有异常, 不会被业务异常给弄死(永动机)。
程序只需通过配置最大最小资源自动适配, 类似现在的数据库连接池。这样既能充分利用线程资源, 也减少线程的空转和线程过多调度的性能浪费。

所以系统性能只由线程池大小属性 sys.exec.corePoolSize=8, sys.exec.maximumPoolSize=16 和 jvm内存参数 -Xmx1024m 控制

框架设计一种轻量级执行器(伪协程): Devourer 来控制执行模式(并发,暂停/恢复,速度等)
上层服务应该只关注怎么组装执行任务,然后提交给 Devourer 或直接给执行线程池。如下图:

Image text

安装教程

<dependency>
    <groupId>cn.xnatural</groupId>
    <artifactId>tiny</artifactId>
    <version>1.4.3</version>
</dependency>

初始化

// 创建一个应用
final AppContext app = new AppContext();

// 添加服务 server1
app.addSource(new ServerTpl("server1") {
    @EL(name = "sys.starting")
    void start() {
        log.info("{} start", name);
    }
});
// 添加自定义服务
app.addSource(new TestService());

// 应用启动(会自动扫描并创建当前包下所有@bean注解的类, 依次触发系统事件)
app.start();

基于事件环型微内核框架结构图

以AppContext#EP为事件中心的挂载服务结构

Image text

系统事件: app.start() 后会依次触发 sys.inited, sys.starting, sys.started

  • sys.inited: 应用始化完成(环境配置, 系统线程池, 事件中心)
  • sys.starting: 通知所有服务启动. 一般为ServerTpl
  • sys.started: 应用启动完成
  • sys.stopping: 应用停止事件(kill pid)

配置

配置文件加载顺序(优先级从低到高):

  • classpath: app.properties, classpath: app-[profile].properties
  • file: ./app.properties, file: ./app-[profile].properties
  • configdir: app.properties, configdir: app-[profile].properties
  • 自定义环境属性配置(重写方法): AppContext#customEnv
  • System.getProperties()
  • 系统属性(-Dconfigname): configname 指定配置文件名. 默认: app
  • 系统属性(-Dprofile): profile 指定启用特定的配置
  • 系统属性(-Dconfigdir): configdir 指定额外配置文件目录
  • 只读取properties文件. 按顺序读取app.properties, app-[profile].properties 两个配置文件
  • 配置文件支持简单的 ${} 属性替换

添加 xnet 服务

### app.properties
xnet.hp=:8080
# 配置集群
# xnet.cluster.master=:5000
app.addSource(new ServerTpl("xnet") { // 添加网络服务
    XNet xnet;
    
    @EL(name = "sched.started")
    void start() {
        xnet = new XNet(attrs(), exec(), bean(Sched.class))
          .setAttr("cluster.name", app().name())
          .setAttr("cluster.id", app().id());
        app().beans().forEachRemaining(e -> xNet.http().chain().resolve(e.getValue()));

        exposeBean(xNet)
        exposeBean(xNet.http())
        exposeBean(xNet.cluster())
        xNet.start()
    }
    
    @EL(name = "sys.stopping")
    void stop() {
        if (xnet != null) xnet.close();
    }
});

添加 xjpa 数据库操作服务

### app.properties
jpa_local.url=jdbc:mysql://localhost:3306/test?useSSL=false&user=root&password=root&allowPublicKeyRetrieval=true
app.addSource(new ServerTpl("jpa_local") { //数据库 jpa_local
    Repo repo;
    
    @EL(name = "sys.starting", async = true)
    void start() {
        repo = new Repo(attrs()).init();
        exposeBean(repo); // 把repo暴露给全局, 即可以通过@Inject注入
        ep.fire(name + ".started");
    }

    @EL(name = "sys.stopping", async = true, order = 2f)
    void stop() { if (repo != null) repo.close(); }
});

添加 sched 时间调度服务

app.addSource(new ServerTpl("sched") {
    Sched sched;
    @EL(name = "sys.starting", async = true)
    void start() {
        sched = new Sched(attrs(), exec()).init();
        exposeBean(sched);
        ep.fire(name + ".started");
    }

    @EL(name = "sched.after")
    void after(Duration duration, Runnable fn) {sched.after(duration, fn);}

    @EL(name = "sys.stopping", async = true)
    void stop() { if (sched != null) sched.stop(); }
});

动态按需添加服务

@EL(name = "sys.inited")
void sysInited() {
    if (!app.attrs("redis").isEmpty()) { //根据配置是否有redis,创建redis客户端工具
        app.addSource(new RedisClient())
    }
}

让系统心跳(即:让系统安一定频率触发事件 sys.heartbeat)

需要用 sched 添加 sched.after 事件监听

@EL(name = "sched.after")
void after(Duration duration, Runnable fn) {sched.after(duration, fn);}

每隔一段时间触发一次心跳, 单位秒

  • 配置(sys.heartbeat): 30~180
// 心跳事件监听器
@EL(name = "sys.heartbeat", async = true)
void myHeart() {
    System.out.println("咚");
}

服务基础类: ServerTpl

推荐所有被加入到AppContext中的服务都是ServerTpl的子类

### app.properties
服务名.prop=1
app.addSource(new ServerTpl("服务名") {
    
    @EL(name = "sys.starting", async = true)
    void start() {
        // 初始化服务
    }
})

bean注入 @Inject(name = "beanName")

注入匹配规则: (已经存在值则不需要再注入)

  1. 如果 @Inject name 没配置

先按 字段类型 和 字段名 匹配, 如无匹配 再按 字段类型 匹配

  1. 则按 字段类型 和 @Inject(name = "beanName") beanName 匹配
app.addSource(new ServerTpl() {
    @Inject Repo repo;  //自动注入

    @EL(name = "sys.started", async = true)
    void init() {
        List<Map> rows = repo.rows("select * from test")
        log.info("========= {}", rows);
    }
});

动态bean获取: 方法 bean(Class bean类型, String bean名字)

app.addSource(new ServerTpl() {
    @EL(name = "sys.started", async = true)
    void start() {
        String str = bean(Repo.class).firstRow("select count(1) as total from test").get("total").toString()
        log.info("=========" + str);
    }
});

bean依赖注入原理

两种bean容器: AppContext是全局bean容器, 每个服务(ServerTpl)都是一个bean容器

获取bean对象: 先从全局查找, 再从每个服务中获取

  • 暴露全局bean
    app.addSource(new TestService());
  • 服务(ServerTpl)里面暴露自己的bean
    Repo repo = new Repo("jdbc:mysql://localhost:3306/test?user=root&password=root").init();
    exposeBean(repo); // 加入到bean容器,暴露给外部使用

属性直通车

服务(ServerTpl)提供便捷方法获取配置.包含: getLong, getInteger, getDouble, getBoolean等

## app.properties
testSrv.prop1=1
testSrv.prop2=2.2
app.addSource(new ServerTpl("testSrv") {
    @EL(name = "sys.starting")
    void init() {
        log.info("print prop1: {}, prop2: {}", getInteger("prop1"), getDouble("prop2"));    
    }
})

对应上图的两种任务执行

异步任务

async(() -> {
    // 异步执行任务
})

创建任务对列

queue("队列名", () -> {
    // 执行任务
})

对列执行器: Devourer

会自旋执行完队列中所有任务
当需要控制任务最多 一个一个, 两个两个... 的执行时
服务基础类(ServerTpl)提供方法创建: queue

添加任务到队列

// 方法1
queue("save", () -> {
    // 执行任务
});
// 方法2
queue("save").offer(() -> {
    // 执行任务
});

队列特性

并发控制

最多同时执行任务数, 默认1(one-by-one)

queue("save").parallel(2)

注: parallel 最好小于 系统最大线程数(sys.exec.maximumPoolSize), 即不能让某一个执行对列占用所有可用的线程

执行速度控制

把任务按速度均匀分配在时间线上执行
支持: 每秒(10/s), 每分(10/m), 每小时(10/h), 每天(10/d)

// 例: 按每分钟执行30个任务的频率
queue("save").speed("30/m")
// 清除速度控制(立即执行)
queue("save").speed(null)

队列 暂停/恢复

// 暂停执行, 一般用于发生错误时
// 注: 必须有新的任务入对, 重新触发继续执行. 或者resume方法手动恢复执行
queue("save")
    .errorHandle {ex, me ->
        // 发生错误时, 让对列暂停执行(不影响新任务入对)
        // 1. 暂停一段时间
        me.suspend(Duration.ofSeconds(180));
        // 2. 条件暂停(每个新任务入队都会重新验证条件)
        // me.suspend(queue -> true);
    };

// 手动恢复执行
// queue("save").resume()

队列最后任务有效

是否只使用队列最后一个, 清除队列前面的任务
适合: 入队的频率比出队高, 前面的任务可有可无

// 例: increment数据库的一个字段的值
Devourer q = queue("increment").useLast(true);
for (int i = 0; i < 20; i++) {
    // 入队快, 任务执行慢, 中间的可以不用执行
    q.offer(() -> repo.execute("update test set count=?", i));
}
// 例: 从服务端获取最新的数据
Devourer q = queue("newer").useLast(true);
// 用户不停的点击刷新
q.offer(() -> {
    Utils.http().get("http://localhost:8080/data/newer").execute();    
})

原理: 并发流量控制锁 LatchLock

当被执行代码块需要控制同时线程执行的个数时

final LatchLock lock = new LatchLock();
lock.limit(3); // 设置并发限制. 默认为1
if (lock.tryLock()) { // 尝试获取一个锁
    try {
        // 被执行的代码块    
    } finally {
        lock.release(); // 释放一个锁
    }
}

数据库操作工具: DB

创建一个数据源

DB repo = new DB("jdbc:mysql://localhost:3306/test", "root", "root", 1, 8);

查询单条记录

repo.row("select * from test order by id desc");

查询多条记录

repo.rows("select * from test limit 10");
repo.rows("select * from test where id in (?, ?)", 2, 7);

查询单个值

// 只支持 Integer.class, Long.class, String.class, Double.class, BigDecimal.class, Boolean.class, Date.class
repo.single("select count(1) from test", Integer.class);

插入一条记录

repo.execute("insert into test(name, age, create_time) values(?, ?, ?)", "方羽", 5000, new Date());

更新一条记录

repo.execute("update test set age = ? where id = ?", 10, 1)

事务

// 执行多条sql语句
repo.trans(() -> {
    // 插入并返回id
    Object id = repo.insertWithGeneratedKey("insert into test(name, age, create_time) values(?, ?, ?)", "方羽", 5000, new Date());
    repo.execute("update test set age = ? where id = ?", 18, id);
    return null;
});

http客户端

// get
Utils.http("http://xnatural.cn:9090/test/cus?p2=2")
    .header("test", "test") // 自定义header
    .cookie("sessionId", "xx") // 自定义 cookie
    .connectTimeout(5000) // 设置连接超时 5秒
    .readTimeout(15000) // 设置读结果超时 15秒
    .param("p1", 1) // 添加参数
    .debug().get();
// post
Utils.http("http://xnatural.cn:9090/test/cus")
    .debug().post();
// post 表单
Utils.http("http://xnatural.cn:9090/test/form")
    .param("p1", "p1")
    .debug().post();
// post 上传文件
Utils.http("http://xnatural.cn:9090/test/upload")
    .param("p1", "xxx")
    .param("file", new File("/tmp/1.txt"))
    .debug().post();
// post 上传文件+json
Utils.http("http://xnatural.cn:9090/test/upload")
    .param("p1", new JSONObject().fluentPut("a", "qqq").fluentPut("b", 1), "application/json")
    .param("file", new File("/tmp/1.txt"))
    .debug().post();
// post json
Utils.http("http://xnatural.cn:9090/test/json")
    .jsonBody(new JSONObject().fluentPut("p1", 1).toString())
    .debug().post();
// post 普通文本
Utils.http("http://xnatural.cn:9090/test/string")
    .textBody("xxxxxxxxxxxxxxxx")
    .debug().post();
// 文件下载
Utils.http("http://localhost:5000/test/download/79975d5cfce74fceb4138932f3873a2d.png").fileHandle(filename -> {
    try {
        return Files.newOutputStream(Paths.get("/tmp/" + (filename == null || filename.isEmpty() ? "tmp" : filename)));
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}).get();

对象拷贝器

javabean 拷贝到 javabean

Utils.copier(
      new Object() {
          public String name = "徐言";
      }, 
      new Object() {
          private String name;
          public void setName(String name) { this.name = name; }
          public String getName() { return name; }
      }
).build();

对象 转换成 map

Utils.copier(
      new Object() {
          public String name = "方羽";
          public String getAge() { return 5000; }
      }, 
      new HashMap()
).build();

添加额外属性源

Utils.copier(
      new Object() {
          public String name = "云仙君";
      }, 
      new Object() {
          private String name;
          public Integer age;
          public void setName(String name) { this.name = name; }
          public String getName() { return name; }
          
      }
).add("age", () -> 1).build();

忽略属性

Utils.copier(
      new Object() {
          public String name = "徐言";
          public Integer age = 22;
      }, 
      new Object() {
          private String name;
          public Integer age = 33;
          public void setName(String name) { this.name = name; }
          public String getName() { return name; }
          
      }
).ignore("age").build(); // 最后 age 为33

属性值转换

Utils.copier(
      new Object() {
          public long time = System.currentTimeMillis();
      }, 
      new Object() {
          private String time;
          public void setTime(String time) { this.time = time; }
          public String getTime() { return time; }
          
      }
).addConverter("time", o -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date((long) o)))
        .build();

忽略空属性

Utils.copier(
      new Object() {
          public String name;
      }, 
      new Object() {
          private String name = "方羽";
          public void setName(String name) { this.name = name; }
          public String getName() { return name; }
          
      }
).ignoreNull(true).build(); // 最后 name 为 方羽

属性名映射

Utils.copier(
      new Object() {
          public String p1 = "徐言";
      }, 
      new Object() {
          private String pp1 = "方羽";
          public void setPp1(String pp1) { this.pp1 = pp1; }
          public String getPp1() { return pp1; }
          
      }
).mapProp( "p1", "pp1").build(); // 最后 name 为 徐言

文件内容监控器(类linux tail)

Utils.tailer().tail("d:/tmp/tmp.json", 5);

nanoId(长度): nano算法生成动态唯一字符

String id = Utils.nanoId();

ioCopy(输入流, 输出流, 速度)

// 文件copy
try (InputStream is = new FileInputStream("d:/tmp/陆景.png"); OutputStream os = new FileOutputStream("d:/tmp/青子.png")) {
    Utils.ioCopy(is, os);
}

简单缓存: CacheSrv

// 添加缓存服务
app.addSource(new CacheSrv());
## app.properties 缓存最多保存100条数据
cacheSrv.itemLimit=100

缓存操作

// 1. 设置缓存
bean(CacheSrv).set("缓存key", "缓存值", Duration.ofMinutes(30));
// 2. 过期函数
bean(CacheSrv).set("缓存key", "缓存值", record -> {
    // 缓存值: record.value
    // 缓存更新时间: record.getUpdateTime()
    return 函数返回过期时间点(时间缀), 返回null(不过期,除非达到缓存限制被删除);    
});
// 3. 获取缓存
bean(CacheSrv).get("缓存key");
// 4. 获取缓存值, 并更新缓存时间(即从现在开始重新计算过期时间)
bean(CacheSrv).getAndUpdate("缓存key");
// 5. 手动删除
bean(CacheSrv).remove("缓存key");

hash缓存操作

// 1. 设置缓存
bean(CacheSrv).hset("缓存key", "数据key", "缓存值", Duration.ofMinutes(30));
// 2. 过期函数
bean(CacheSrv).hset("缓存key", "数据key", "缓存值", record -> {
    // 缓存值: record.value
    // 缓存更新时间: record.getUpdateTime()
    return 函数返回过期时间点(时间缀), 返回null(不过期,除非达到缓存限制被删除);    
});
// 3. 获取缓存
bean(CacheSrv).hget("缓存key", "数据key");
// 4. 获取缓存值, 并更新缓存时间(即从现在开始重新计算过期时间)
bean(CacheSrv).hgetAndUpdate("缓存key", "数据key");
// 5. 手动删除
bean(CacheSrv).hremove("缓存key", "数据key");

无限递归优化实现: Recursion

解决java无尾递归替换方案. 例:

System.out.println(factorialTailRecursion(1, 10_000_000).invoke());
/**
 * 阶乘计算
 * @param factorial 当前递归栈的结果值
 * @param number 下一个递归需要计算的值
 * @return 尾递归接口,调用invoke启动及早求值获得结果
 */
Recursion<Long> factorialTailRecursion(final long factorial, final long number) {
    if (number == 1) {
        // new Exception().printStackTrace();
        return Recursion.done(factorial);
    }
    else {
        return Recursion.call(() -> factorialTailRecursion(factorial + number, number - 1));
    }
}

备忘录模式:提升递归效率. 例:

System.out.println(fibonacciMemo(47));
/**
 * 使用同一封装的备忘录模式 执行斐波那契策略
 * @param n 第n个斐波那契数
 * @return 第n个斐波那契数
 */
long fibonacciMemo(long n) {
    return Recursion.memo((fib, number) -> {
        if (number == 0 || number == 1) return 1L;
        return fib.apply(number-1) + fib.apply(number-2);
    }, n);
}

延迟对象: Lazier

封装是一个延迟计算值(只计算一次)

final Lazier<String> _id = new Lazier<>(() -> {
    String id = getHeader("X-Request-ID");
    if (id != null && !id.isEmpty()) return id;
    return UUID.randomUUID().toString().replace("-", "");
});
  • 延迟获取属性值
    final Lazier<String> _name = new Lazier<>(() -> getAttr("sys.name", String.class, "app"));
  • 重新计算
    final Lazier<Integer> _num = new Lazier(() -> new Random().nextInt(10));
    _num.get();
    _num.clear(); // 清除重新计算
    _num.get();

时间段统计工具: Counter

// 按小时统计
Counter counter = new Counter("MM-dd HH", (time, count) -> {
    // time: 指具体某个小时
    // count: 指具体某个小时的统计个数
});
counter.increment(); // 当前小时统计加一

应用例子

最佳实践: Demo(java) , Demo(scala) , GRule(groovy)

1.4.4 ing

  • feat: Httper 代理
  • feat: 空闲任务
  • feat: 增加日志级别配置
  • feat: Httper 工具支持 websocket
  • feat: 自定义注解

参与贡献

xnatural@msn.cn

Java
1
https://gitee.com/xnat/tiny.git
git@gitee.com:xnat/tiny.git
xnat
tiny
tiny
master

搜索帮助