3 Star 0 Fork 0

eric_1989 / eric_article

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
AutoPromotePriorityQueue.java 11.32 KB
一键复制 编辑 原始数据 按行查看 历史
eric_1989 提交于 2020-01-10 19:56 . 优先级自动晋升线程池实现
import java.util.AbstractQueue;
import java.util.Collection;
import java.util.Deque;
import java.util.Iterator;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class AutoPromotePriorityQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable>
{
final int numOfSlot = 1 << 4;
//优先级的范围就是[0,slotShift]
final int slotShift = numOfSlot - 1;
final int lowestPriority = 5;
//每提取4次晋升优先级
final int promoteShift = (1 << 1) - 1;
AtomicReferenceArray<Chunk> slots = new AtomicReferenceArray<>(numOfSlot);
AtomicInteger read = new AtomicInteger();
AtomicInteger write = new AtomicInteger();
AtomicReferenceFieldUpdater<Chunk, Chunk> nextOfChunkUpdater = AtomicReferenceFieldUpdater.newUpdater(Chunk.class, Chunk.class, "next");
AtomicIntegerFieldUpdater<Chunk> stateOfChunkUpdater = AtomicIntegerFieldUpdater.newUpdater(Chunk.class, "state");
final Lock lock = new ReentrantLock();
final Condition notEmpty = lock.newCondition();
public AutoPromotePriorityQueue()
{
for (int i = 0; i < slots.length(); i++)
{
slots.set(i, new Chunk(i));
}
}
@Override
public Iterator<Runnable> iterator()
{
throw new UnsupportedOperationException();
}
@Override
public Object[] toArray(Object[] a)
{
return new Object[0];
}
@Override
public int size()
{
throw new UnsupportedOperationException();
}
static final int using = 0;
static final int shutdown = 1;
class Chunk
{
final int priority;
volatile Chunk next;
final Deque<Runnable> deque = new ConcurrentLinkedDeque<>();
volatile int state = using;
Chunk(int priority)
{
this.priority = priority;
}
}
static interface PriorityTask extends Runnable
{
int priority();
void setWrite(int write);
}
@Override
public boolean add(Runnable task)
{
return offer(task);
}
public boolean offer(Runnable task)
{
int w = write.get();
w += ((PriorityTask) task).priority();
int index = w & slotShift;
Chunk chunk = slots.get(index);
if (chunk.priority == w)
{
((PriorityTask) task).setWrite(w);
doOffer(task, chunk);
}
else
{
Chunk pred = chunk;
Chunk next = chunk.next;
while (next != null && next.priority != w)
{
pred = next;
next = next.next;
}
if (next != null)
{
doOffer(task, next);
}
else
{
Chunk newInstance = new Chunk(w);
nextOfChunkUpdater.compareAndSet(pred, null, newInstance);
return offer(task);
}
}
lock.lock();
try
{
notEmpty.signal();
}
finally
{
lock.unlock();
}
return true;
}
private void doOffer(Runnable task, Chunk chunk)
{
chunk.deque.offerLast(task);
if (chunk.state == shutdown)
{
Deque<Runnable> deque = chunk.deque;
Runnable reOffer;
while ((reOffer = deque.poll()) != null)
{
offer(reOffer);
}
}
else
{
;
}
}
@Override
public void put(Runnable task) throws InterruptedException
{
offer(task);
}
@Override
public boolean offer(Runnable task, long timeout, TimeUnit unit) throws InterruptedException
{
throw new UnsupportedOperationException();
}
@Override
public PriorityTask poll()
{
throw new UnsupportedOperationException();
}
@Override
public PriorityTask peek()
{
throw new UnsupportedOperationException();
}
@Override
public boolean addAll(Collection c)
{
throw new UnsupportedOperationException();
}
public Runnable take()
{
Runnable priorityTask = innerTake();
if (priorityTask != null)
{
return priorityTask;
}
lock.lock();
try
{
while ((priorityTask = innerTake()) == null)
{
notEmpty.await();
}
return priorityTask;
}
catch (InterruptedException e)
{
e.printStackTrace();
throw new IllegalStateException(e);
}
finally
{
lock.unlock();
}
}
AtomicInteger readCount = new AtomicInteger();
private Runnable innerTake()
{
int r = read.get();
int index = r & slotShift;
Chunk chunk = slots.get(index);
if (chunk.priority < r)
{
if (chunk.next != null)
{
slots.compareAndSet(index, chunk, chunk.next);
return innerTake();
}
else
{
return moveRead(r);
}
}
else if (chunk.priority == r)
{
Runnable task = chunk.deque.poll();
if (task != null)
{
int current = readCount.incrementAndGet();
if ((current & promoteShift) == 0)
{
write.incrementAndGet();
}
return task;
}
if (chunk.state == shutdown)
{
if (chunk.next != null)
{
slots.compareAndSet(index, chunk, chunk.next);
}
return moveRead(r);
}
else
{
stateOfChunkUpdater.compareAndSet(chunk, using, shutdown);
return innerTake();
}
}
else
{
return moveRead(r);
}
}
private Runnable moveRead(int r)
{
int wBarrier = write.get() + lowestPriority;
if (r < wBarrier)
{
read.compareAndSet(r, r + 1);
return innerTake();
}
else
{
return null;
}
}
@Override
public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException
{
Runnable priorityTask = innerTake();
if (priorityTask != null)
{
return priorityTask;
}
lock.lock();
try
{
while ((priorityTask = innerTake()) == null)
{
//这边的实现没有检查时间,不过小细节
if (notEmpty.await(timeout, unit))
{
return null;
}
}
return priorityTask;
}
catch (InterruptedException e)
{
e.printStackTrace();
throw new IllegalStateException(e);
}
finally
{
lock.unlock();
}
}
@Override
public int remainingCapacity()
{
throw new UnsupportedOperationException();
}
@Override
public int drainTo(Collection c)
{
throw new UnsupportedOperationException();
}
@Override
public int drainTo(Collection c, int maxElements)
{
throw new UnsupportedOperationException();
}
static long time = 100;
static class HighTask implements PriorityTask
{
final int seq;
CountDownLatch countDownLatch;
int write;
HighTask(int seq, CountDownLatch countDownLatch)
{
this.seq = seq;
this.countDownLatch = countDownLatch;
}
@Override
public int priority()
{
return 0;
}
@Override
public void setWrite(int write)
{
this.write = write;
}
@Override
public void run()
{
try
{
//保证低优先级有足够的事件进入队列
Thread.sleep(time );
}
catch (InterruptedException e)
{
e.printStackTrace();
}
// System.out.println("高优先级任务" + hashCode() + "被执行,预计耗时1秒");
try
{
Thread.sleep(time);
countDownLatch.countDown();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
System.out.println("高优先级任务" + hashCode() + "执行完毕,槽位:" + write + ",序号:" + seq);
}
}
static class LowTask implements PriorityTask
{
final int seq;
CountDownLatch countDownLatch;
private int write;
LowTask(int seq, CountDownLatch latch)
{
this.seq = seq;
countDownLatch = latch;
}
@Override
public int priority()
{
return 1;
}
@Override
public void setWrite(int write)
{
this.write = write;
}
@Override
public void run()
{
countDownLatch.countDown();
// System.out.println("低优先级任务" + hashCode() + "被执行,预计耗时1秒");
try
{
Thread.sleep(time);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
System.out.println("低任务" + hashCode() + "执行完毕,槽位:" + write + ",序号:" + seq);
}
}
public static void main(String[] args) throws InterruptedException
{
AtomicInteger count = new AtomicInteger();
int size = 7;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(size, size, 1, TimeUnit.SECONDS, new AutoPromotePriorityQueue());
//由于ThreadPoolExecutor启动任务并不是从队列中提取,为了产生效果,需要损耗和线程数相同的任务。
CountDownLatch latch = new CountDownLatch(1+size);
for (int i = 0; i < 7+size; i++)
{
threadPoolExecutor.execute(new HighTask(count.getAndIncrement(), latch));
}
for (int i = 0; i < 5; i++)
{
threadPoolExecutor.execute(new LowTask(count.getAndIncrement(), latch));
}
//模拟在高优先级任务运行过程中,低优先级仍然在等待,但是已经晋升,此时有高优先级插队,并不能抢夺资源
latch.await();
System.out.println("重新添加新任务");
for (int i = 0; i < 5; i++)
{
threadPoolExecutor.execute(new HighTask(count.getAndIncrement(), latch));
}
}
}
1
https://gitee.com/eric_ds/eric_article.git
git@gitee.com:eric_ds/eric_article.git
eric_ds
eric_article
eric_article
master

搜索帮助