验证中...
OrderMonitior.java
Raw Copy
package com.adj.amateur.redis;
import java.util.concurrent.CopyOnWriteArrayList;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import com.adj.amateur.entity.MemberInfo;
import com.adj.amateur.tool.Constants;
import com.alibaba.fastjson.JSON;
import redis.clients.jedis.Jedis;
//容器初始化后--执行相关操作
@Component
public class OrderMonitior implements InitializingBean{
private static Jedis jedis=JedisClient.getJedis(Constants.REDIS_EXPIRE_LISTENER);
public OrderMonitior() {
super();
// TODO Auto-generated constructor stub
}
private static final String MIN_SCORE = "0";
private static final String OFFSET = "0";
private static final String LIMIT = "10";
private String key="redis-delayed_key";
//@1 get
private static final String SCRIPT = "local table = redis.call('ZRANGEBYSCORE', KEYS[1], ARGV[1], ARGV[2]);\n" +
"\n" +
"local key = table[1];\n" +
"\n" +
"if key == nil then\n" +
"\treturn nil;\n" +
"else\n" +
"\tredis.call('ZREM', KEYS[1], key);\n" +
"\treturn key;\n" +
"end";//原子操作解决多实例部署不一致情况
//@2 set
private static final String enqueue = "local zset_key = KEYS[1]\r\n" +
"local hash_key = KEYS[2]\r\n" +
"local zset_value = ARGV[1]\r\n" +
"local zset_score = ARGV[2]\r\n" +
"local hash_field = ARGV[3]\r\n" +
"local hash_value = ARGV[4]\r\n" +
"redis.call('ZADD', zset_key, zset_score, zset_value)\r\n" +
"redis.call('HSET', hash_key, hash_field, hash_value)\r\n" +
"return nil";//使用zadd hset 放入redis有序队列中
//@2 get,remove
private static final String dequeue = "local zset_key = KEYS[1]\r\n" +
"local hash_key = KEYS[2]\r\n" +
"local min_score = ARGV[1]\r\n" +
"local max_score = ARGV[2]\r\n" +
"local offset = ARGV[3]\r\n" +
"local limit = ARGV[4]\r\n" +
"-- TYPE命令的返回结果是{'ok':'zset'}这样子,这里利用next做一轮迭代\r\n" +
"local status, type = next(redis.call('TYPE', zset_key))\r\n" +
"if status ~= nil and status == 'ok' then\r\n" +
" if type == 'zset' then\r\n" +
" local list = redis.call('ZREVRANGEBYSCORE', zset_key, max_score, min_score, 'LIMIT', offset, limit)\r\n" +
" if list ~= nil and #list > 0 then\r\n" +
" -- unpack函数能把table转化为可变参数\r\n" +
" redis.call('ZREM', zset_key, unpack(list))\r\n" +
" local result = redis.call('HMGET', hash_key, unpack(list))\r\n" +
" redis.call('HDEL', hash_key, unpack(list))\r\n" +
" return result\r\n" +
" end\r\n" +
" end\r\n" +
"end\r\n" +
"return nil";//参考jesque的部分Lua脚本实现 - TYPE命令的返回结果是{'ok':'zset'}这样子,这里利用next做一轮迭代 -- unpack函数能把table转化为可变参数
/**
* 守护线程
*/
private Thread mainThread;
public void init(){
mainThread = new Thread(() -> execute());
mainThread.setDaemon(true);
mainThread.setName("守护线程-执行订单关闭-->");
mainThread.start();
}
public void execute() {
while(true) {
/*String result=poll();
if (StringUtils.isNotBlank(result)) {
System.out.println("过期未处理key+++++++++++"+result);
jedis.zrem(key,result);
}*/
List<MemberInfo> memberInfos= dequeue();
if (!CollectionUtils.isEmpty(memberInfos)) {
System.out.println(memberInfos);
}
}
}
public String poll() {//取出元素
List<String> argvs = new ArrayList<String>();
argvs.add("0");
argvs.add(String.valueOf(Instant.now().toEpochMilli()));
Object result = jedis.eval(SCRIPT, Collections.singletonList(key), argvs);
if (Objects.isNull(result)) {
return null;
}
return String.valueOf(result);
}
private static final String ORDER_QUEUE = "ORDER_QUEUE";
private static final String ORDER_DETAIL_QUEUE = "ORDER_DETAIL_QUEUE";
private static final List<String> KEYS = new CopyOnWriteArrayList<String>();
static {
KEYS.add(ORDER_QUEUE);
KEYS.add(ORDER_DETAIL_QUEUE);
}
public static void queue(MemberInfo memberInfo) {
// TODO Auto-generated method stub
List<String> args=new CopyOnWriteArrayList<String>();
args.add(memberInfo.getMemberId().toString());
args.add(String.valueOf( System.currentTimeMillis()));
args.add(memberInfo.getMemberId().toString());
args.add(JSON.toJSONString(memberInfo));
jedis.eval(enqueue,KEYS, args);
}
public List<MemberInfo> dequeue() {
String maxScore = String.valueOf(System.currentTimeMillis() - 5 * 60 * 1000);//30分钟之前
return deque(MIN_SCORE,maxScore,OFFSET,LIMIT);
}
public List<MemberInfo> deque(String min, String max, String offset, String limit) {
List<String> args=new CopyOnWriteArrayList<String>();
args.add(min);
args.add(max);
args.add(offset);
args.add(limit);
List<MemberInfo> members = new CopyOnWriteArrayList<MemberInfo>();
List<String> eval= (List<String>) jedis.eval(dequeue,KEYS,args);
if (!CollectionUtils.isEmpty(eval)) {
for (String expr : eval) {
members.add(JSON.parseObject(expr, MemberInfo.class));
}
}
return members;
}
@Override
public void afterPropertiesSet() throws Exception {
// TODO Auto-generated method stub
init();
}
}

Comment list( 0 )

You need to Sign in for post a comment

Help Search