TimingWheel 时间轮算法是如何实现的?( 三 )

  1. 时间轮运行的时候首先会记录一下启动时间(startTime),然后调用startTimeInitialized释放外层的等待线程;
  2. 进入dowhile循环,调用waitForNextTick睡眠等待到下一次的tick指针的跳动,并返回当前时间减去startTime作为deadline
  3. 由于mask= wheel.length -1,wheel是2的次方数,所以可以直接用tick & mask 计算出此次在wheel中的槽位
  4. 调用processCancelledTasks将cancelledTimeouts队列中的任务取出来,并将当前的任务从时间轮中移除
  5. 调用transferTimeoutsToBuckets方法将timeouts队列中缓存的数据取出加入到时间轮中
  6. 运行目前指针指向的槽位中的bucket链表数据
时间轮指针跳动waitForNextTick//sleep, 直到下次tick到来, 然后返回该次tick和启动时间之间的时长private long waitForNextTick() {//tickDuration这里是100000//tick表示总tick数long deadline = tickDuration * (tick + 1);for (;;) {final long currentTime = System.nanoTime() - startTime;// 计算需要sleep的时间, 之所以加999999后再除10000000,前面是1所以这里需要减去1,// 才能计算准确,还有通过这里可以看到 其实线程是以睡眠一定的时候再来执行下一个ticket的任务的,//这样如果ticket的间隔设置的太小的话,系统会频繁的睡眠然后启动,//其实感觉影响部分的性能,所以为了更好的利用系统资源步长可以稍微设置大点long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;//sleepTimeMs小于零表示走到了下一个时间轮位置if (sleepTimeMs <= 0) {if (currentTime == Long.MIN_VALUE) {return -Long.MAX_VALUE;} else {return currentTime;}}// Check if we run on windows, as if thats the case we will need// to round the sleepTime as workaround for a bug that only affect// the JVM if it runs on windows.//// See https://github.com/netty/netty/issues/356if (Platform.isWindows()) {sleepTimeMs = sleepTimeMs / 10 * 10;}try {Thread.sleep(sleepTimeMs);} catch (InterruptedException ignored) {if (workerStateUpdater.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {return Long.MIN_VALUE;}}}}可以想象一下在时钟的秒钟上面秒与秒之间的时间是需要等待的,那么waitForNextTick这个方法就是根据当前的时间计算出跳动到下个时间的间隔时间,并进行sleep操作,然后返回当前时间距离时间轮启动时间的时间段 。
转移任务到时间轮中在调用时间轮的方法加入任务的时候并没有直接加入到时间轮中,而是缓存到了timeouts队列中,所以在运行的时候需要将timeouts队列中的任务转移到时间轮数据的链表中
transferTimeoutsToBucketsprivate void transferTimeoutsToBuckets() {// transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just// adds new timeouts in a loop.// 每次tick只处理10w个任务,以免阻塞worker线程for (int i = 0; i < 100000; i++) {HashedWheelTimeout timeout = timeouts.poll();if (timeout == null) {// all processedbreak;}//已经被取消了;if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {// Was cancelled in the meantime.continue;}//calculated = tick 次数long calculated = timeout.deadline / tickDuration;// 计算剩余的轮数, 只有 timer 走够轮数, 并且到达了 task 所在的 slot, task 才会过期timeout.remainingRounds = (calculated - tick) / wheel.length;//如果任务在timeouts队列里面放久了, 以至于已经过了执行时间, 这个时候就使用当前tick, 也就是放到当前bucket, 此方法调用完后就会被执行final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.//// 算出任务应该插入的 wheel 的 slot, slotIndex = tick 次数 & mask, mask = wheel.length - 1int stopIndex = (int) (ticks & mask);HashedWheelBucket bucket = wheel[stopIndex];//将timeout加入到bucket链表中bucket.addTimeout(timeout);}}在这个转移方法中,写死了一个循环,每次都只转移10万个任务 。
然后根据HashedWheelTimeout的deadline延迟时间计算出时间轮需要运行多少次才能运行当前的任务,如果当前的任务延迟时间大于时间轮跑一圈所需要的时间,那么就计算需要跑几圈才能到这个任务运行 。
最后计算出该任务在时间轮中的槽位,添加到时间轮的链表中 。
运行时间轮中的任务当指针跳到时间轮的槽位的时间,会将槽位的HashedWheelBucket取出来,然后遍历链表,运行其中到期的任务 。
expireTimeouts// 过期并执行格子中的到期任务,tick到该格子的时候,worker线程会调用这个方法//根据deadline和remainingRounds判断任务是否过期public void expireTimeouts(long deadline) {HashedWheelTimeout timeout = head;// process all timeouts//遍历格子中的所有定时任务while (timeout != null) {// 先保存next,因为移除后next将被设置为nullHashedWheelTimeout next = timeout.next;if (timeout.remainingRounds <= 0) {//从bucket链表中移除当前timeout,并返回链表中下一个timeoutnext = remove(timeout);//如果timeout的时间小于当前的时间,那么就调用expire执行taskif (timeout.deadline <= deadline) {timeout.expire();} else {//不可能发生的情况,就是说round已经为0了,deadline却>当前槽的deadline// The timeout was placed into a wrong slot. This should never hAppen.throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)",timeout.deadline, deadline));}} else if (timeout.isCancelled()) {next = remove(timeout);} else {//因为当前的槽位已经过了,说明已经走了一圈了,把轮数减一timeout.remainingRounds--;}//把指针放置到下一个timeouttimeout = next;}}


推荐阅读