如何实现一个定时器?看这一篇就够了

沙海
沙海
沙海
718
文章
2
评论
2021年3月31日03:39:23
评论
2 12878字阅读42分55秒
摘要

速读摘要

速读摘要

本文主要介绍定时器作用,实现定时器数据结构选取,并详细介绍了跳表,时间轮实现定时器的思路和方法。接下来将介绍分别用跳表、红黑树、时间轮来实现定时器。时间的流逝,任务不断从上层流下下一层,最终到达秒针轮上,当秒针走到时执行。wheel)**即保存着最近将要执行的任务,随着时间的流逝,任务会慢慢的从上层流到秒针轮中进行执行。其中time为32位无符号整数,记录时间片对应数组near[256],表示即将到来的定时任务,t[4][64],表示较为遥远的定时任务。

原文约 4543 | 图片 4 | 建议阅读 10 分钟 | 评价反馈

如何实现一个定时器?看这一篇就够了

程序IT圈

以下文章来源于程序员不是码农,作者程序员不是码农

如何实现一个定时器?看这一篇就够了

程序员不是码农

985硕,大厂经历,专注技术,热衷分享,欢迎交流~

本文主要介绍定时器作用,实现定时器数据结构选取,并详细介绍了跳表红黑树时间轮实现定时器的思路和方法。

定时器作用

定时器在各种场景都需要用到,比如游戏Buff实现,Redis中的过期任务Linux中的定时任务等等。顾名思义,定时器的主要用途是执行定时任务

定时器数据结构选取

定时器数据结构要求

  • 需要快速找到到期任务,因此,应该具有有序性

  • 过期执行插入(添加定时任务)和删除(取消定时任务)的频率比较高,三种操作效率必须保证

以下为各数据结构时间复杂度表现

有序链表:插入O(n),删除O(1),过期expire执行O(1)

最小堆:插入O(logn),删除O(logn),过期expire执行O(1)

红黑树:插入O(logn),删除O(logn),过期expire执行O(logn)

哈希表+链表(时间轮):插入O(1),删除O(1),过期expire平均执行O(1)(最坏为O(n)

不同开源框架定时器实现方式不一,如,libuv采用最小堆来实现,nginx采用红黑树实现,linux内核和skynet采用时间轮算法实现等等。

定时器接口封装

作为定时器,需要封装以下4类接口给用户使用:

  • 创建定时器init_timer

  • 添加定时任务add_timer

  • 取消定时任务cancel_timer

  • 执行到期任务expire_timer

其中执行到期任务有两种工作方式:

  1. 轮询: 每隔一个时间片去查找哪些任务到期

  2. 睡眠/唤醒:不停查找deadline最近任务,到期执行,否则sleep;sleep期间,任务有改变,线程会被唤醒

接下来将介绍分别用跳表红黑树时间轮来实现定时器。

跳表实现定时器

跳表简介

跳表是一种动态的数据结构,采用空间换时间的思想,在有序链表基础上加入多级索引,通过索引进行二分快速查找,支持快速删除、插入和查找操作(平均时间复杂度为O(logN),最坏为O(N)),效率可与平衡树媲美,实现比其简单。

下面通过一张图来简单说明跳表操作。跳表的最底层即为基本的有序链表,存储所有的数据,可理解为数据层;往上则为索引层,理想状态下,上一层为下一层节点数的一半。比如,要查找下图的数据为11的节点,从begin''出发,向走,如果下一个节点大于11则往走,直到找到目标节点。可见,跳表要比原始链表少比较一些节点,但前提是需要花更多空间存储索引节点。

如何实现一个定时器?看这一篇就够了

image-20210323182236910

跳表实现定时器

  • 跳表查找,插入,删除(任意节点、头节点)的时间复杂度大概率趋向于O(logn)

  • 过期任务查找,只需要跟第一个节点比较,因其第一个节点即为最小节点

学会吸取开源框架中优秀数据结构和代码思想,直接采用redis跳表结构的实现,取出所需部分,用于实现定时器。如下:

跳表数据结构

跳表节点与跳表结构

/*skiplist.h*/#define ZSKIPLIST_MAXLEVEL 32#define ZSKIPPLIST 0.25typedef struct zskiplistNode zskiplistNode;typedef void (*handler_pt) (zskiplistNode * node);// 跳表节点struct zskiplistNode {  unsigned long score;  /*用于排序的值*/  handler_pt handler;  /*处理函数*/  struct zskiplistLevel {    struct zskiplistNode **forward;  }level[];};// 跳表结构typedef struct zskiplist {  struct zskiplistNode * header;  int length;  int level;  /*跳表层数*/}zskiplist;

跳表接口申明

具体接口实现细节请移步redis源码。

/*skiplist.h*//*创建跳表,初始化*/zskiplist *zslCreate(void);/*删除跳,表释放资源*/void zslFree(zskiplist *zsl);/*插入节点*/zskiplistNode *zslInsert(zskiplist *zsl, unsigned long score, handler_pt func);/*删除头节点*/void zsklDeleteHead(zskiplist *zsl);/*删除任意节点*/void zslDelete(zskiplist *zsl, zskplistNode *zn);/*打印,调试*/void zslPrint(zskiplist *zsl);

定时器接口实现

主要介绍四个接口实现:初始化定时器,添加定时任务,删除/取消定时任务,处理定时任务

// test_user.c  封装给用户使用的接口static uint32_tcurrent_time() { uint32_t t;    struct timespec ti;    clock_getttime(CLOCK_MONOTONIC, &ti);    t = (uint32_t)ti.tv_sec * 1000;    t += ti.tv_sec / 1000000;}zskiplist *init_timer() {    // 初始化定时器    return zslCreate();}zskiplistNode *add_timer(zskiplist *zsl, uint32_t msec, handler_pt func) {    // 添加定时任务    msec += current_time();    return zslInsert(zsl, msec, func);}void cancel_timer(zskiplist *zsl, zskiplistNode *zn) {    // 删除/取消定时任务    zslDelete(zsl, zn);}void expire_timer(zskiplist *zsl){    // 处理定时任务    zskiplistNode *x;    uint32_t now = current_time();    for (;;) {        x = zslMin(zsl);  // 最近节点        if (!x) break;        if (x->score > now)  break;  // 时间未到        x->handler(x);  // 执行相关定时任务        zslDeleteHead(zsl);  // 执行完删除    }}

红黑树实现定时器

红黑树

红黑树是一种自平衡二叉查找树,即,插入和删除操作如果破坏树的平衡时,需要重新调整达到平衡状态。因此,是一种比较难的数据结构。

红黑树五条性质

  • 每个节点要么是红色,要么是黑色

  • 根节点是黑色

  • 每个叶子结点是黑色

  • 每个红节点的两个子节点一定是黑色

  • 任意一节点到每个叶子节点的路径都含相同数目的黑结点

弄懂红黑树如何调整树的平衡,保证满足这5条性质,是比较麻烦,需要耐心的去推导一遍,此处不展开。

红黑树实现定时器

AVL 树平衡要求太高,维护平衡操作过多,较复杂;红黑树只需维护一个黑高度,效率较高

红黑树查找删除添加时间复杂度为:O(log(n))

吸取开源框架中优秀数据结构和代码思想,选用nginx中的红黑树结构

红黑树数据结构

红黑树节点与红黑树

// rbtree.h  红黑树数据结构以及相关接口,具体接口实现同上#ifndef _NGX_RBTREE_H_INCLUDE_#define _NGX_RBTREE_H_INCLUDE_typedef unsigned int ngx_rbtree_key_t;typedef unsigned int ngx_uint_t;typedef int ngx_rbtree_key_int_t;// 红黑树节点typedef struct ngx_rbtree_node_s  ngx_rbtree_node_t;struct ngx_rbtree_node_s {    ngx_rbtree_key_t key;    ngx_rbtree_node_t *left;    ngx_rbtree_node_t *right;    ngx_rbtree_node_t *parent;    u_char    color;  // 节点颜色    u_char    data;  // 节点数据};// 插入函数指针typedef void (*ngx_rbtree_insert_pt) (ngx_rbtree_node_t *root,     ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel);// 红黑树typedef struct ngx_rbtree_s ngx_rbtree_t;struct ngx_rbtree_s {    ngx_rbtree_node_t  *root;    ngx_rbtree_node_t  *sentinel;    ngx_rbtree_insert_pt insert;};

红黑树接口声明

// 红黑树初始化#define ngx_rbtree_init(tree, s, i)       \ ngx_rbtree_sentinel_init(s);      \ (tree)->root = s;        \ (tree)->sentinel = s;       \ (tree)->insert = i;        // 插入操作void ngx_rbtree_insert(ngx_rbtree_t *tree, ngx_rbtree_node_t *node);// 删除操作void ngx_rbtree_delete(ngx_rbtree_t *tree, ngx_rbtree_node_t *node);// 插入valuevoid ngx_rbtree_insert_value(ngx_rbtree_node_t *root, ngx_rbtree_node_t *node,                            ngx_rbtree_node_t *sentinel);// 插入timervoid ngx_rbtree_insert_timer_value(ngx_rbtree_node_t *root,                                  ngx_rbtree_node_t *node,                                  ngx_rbtree_node_t *sentinel);// 获取下一个节点ngx_rbtree_node_t *ngx_rbtree_next(ngx_rbtree_t *tree, ngx_rbtree_node_t *node);#define ngx_rbt_red(node)    ((node)->color = 1)#define ngx_rbt_black(node)    ((node)->color = 0)#define ngx_rbt_is_red(node)   ((node)->color)#define ngx_rbt_is_black(node)   (!ngx_rbt_is_red(node))#define ngx_rbt_copy_color(n1, n2)  (n1->color = n2->color)#define ngx_rbtree_sentinel_init(node)  ngx_rbt_black(node)// 找到最小值,一直往左走即可static inline ngx_rbtree_node_t *ngx_rbtree_min(ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel){    while (node->left != sentinel){        node = node->left;    }    return node;}

定时器接口实现

// test_user.c  封装给用户使用的接口ngx_rbtree_t     timer;static ngx_rbtree_node_t   sentinel;typedef struct timer_entry_s timer_entry_t;typedef void (*timer_handler_pt)(timer_entry_t *ev);struct timer_entry_s {   ngx_rbtree_node_t timer;    timer_handler_pt  handler;};// 初始化int init_timer() {    ngx_rbtree_init(&timer, &sentinel, ngx_rbtree_insert_timer_value);    return 0;}// 添加定时任务void add_timer(timer_entry_t *te, uint32_t msec) {    msec += current_time();    te->timer.key = msec;    ngx_rbtree_insert(&timer, &te->timer);}// 取消定时void cancel_timer(timer_entry_t *te) {    ngx_rbtree_delete(&timer, &te->timer);}// 执行到期任务void expire_timer() {    timer_entry_t *te;    ngx_rbtree_node_t *sentinel, *root, *node;    sentinel = timer.sentinel;    uint32_t now = current_time();    for(;;){        root = timer.root;        if (root == sentinel) break;        if (node->key > now) break;        te = (timer_entry_t *) ((char *) node - offsetof(timer_entry_t, timer));        te->handler(te);        ngx_rbtree_delete(&timer, &te->timer);        free(te);    }}

以上,为红黑树跳表实现的定时器多线程环境下加锁粒度比较大,高并发场景下效率不高,而时间轮适合高并发场景,如下。

时间轮实现定时器

时间轮

可以用于高效的执行大量定时任务,如下为分层时间轮示意图:

如何实现一个定时器?看这一篇就够了

timewheel

时间轮可参考时钟进行理解,秒针(Seconds wheel)转一圈,则分针(Minutes wheel)走一格,分针(Minutes wheel)转一圈,则时针(Hours wheel)走一格。随着,时间的流逝,任务不断从上层流下下一层,最终到达秒针轮上,当秒针走到时执行

如上所示,时间轮大小为8格,秒针1s转动一格,其每一格所指向的链表保存着待执行任务。比如,如果当前指针指向1,要添加一个3s后执行的任务,由于1+3=4,即在第4格的链表中添加一个任务节点即可。如果要添加一个10s后执行的任务,10+1=11,超过了秒针轮范围,因此需要对8取模11 % 8 = 3,即,会把这个任务放到分针轮3对应的链表上,之后再从分针轮把任务丢到秒针轮上进行处理。也即,**秒针轮(Seconds wheel)**即保存着最近将要执行的任务,随着时间的流逝,任务会慢慢的从上层流到秒针轮中进行执行。

优点:加锁粒度较小,只需要加一个格子即可,一个格子对应一串链表;适合高并发场景

缺点不好删除

如何解决时间轮定时任务删除?

  1. 通过引用计数来解决

  2. 交由业务层处理,将删除标记设为true , 在函数回调中根据这个标记判断是否需要处理

这里介绍两种定时器实现方案,一种是简单实现方案,另一种是skynet较为复杂的实现。

时间轮实现定时器

简单时间轮实现方案

功能场景:由心跳包进行超时连接检测,10s未收到则断开连接

一般做法map<fd, *connect>每秒轮询这个结构,检测所有连接是否超时,收到心跳包,记录时间戳

缺点:效率很差,每次需要检测所有连接,时间复杂度为O(n)

优化分治大法,只需检测快过期的连接, 采用hash数组+链表形式,数组大小设置成16 :[0] + [1] + [2] + ... + [15] ,相同过期时间的放入一个数组,因此,每次只需检测最近过期的数组即可,不需要遍历所有。

数据结构定义

以下为定时器节点,增加引用计数ref, 只有当ref为0时删除连接。

class CTimerNode {public:    CTimerNode(int fd) : id(fd), ref(0) {}    void Offline() {this->ref = 0};    bool tryKill() {        if (this->ref == 0) return true;        DecRef();        if (this->ref == 0){            return true;        }        return false;    }    void IncRef() {this->ref++;}protected:    void DecRef() {this->ref--;}private:    int ref;    int id;}// 时间轮数组大小16, (x对16取余)==(x&1111) 落到0-15之间,即落到对应的数组const int TW_SIZE = 16;const in EXPIRE = 10; // 过期间隔const int TW_MASK = TW_SIZE - 1;  // 掩码, 用于对16取余static size_t iReadTick = 0;  // 滴答时钟typedef list<CTimerNode*> TimeList; // 数组每一个槽位对应一个listtypedef TimeList::iterator TimeListIter;typedef vector<TimeList> TimeWheel; // 时间轮
定时器接口
// 添加定时void AddTimeOut(TimerWheel &tw, CTimerNode *p) {    if (p) {        p->IncRef();        // 找到iRealTick对应数组的idx(槽位)        TimeList &le = tw[(iRealTick+EXPIRE) & TW_MASK];        le.push_back(p);  // 把时间节点加入list中    }}// 延时调用void AddTimeOutDelay(TimeWheel &tw, CTimerNode *p, size_t delay) {    if (p) {        p->IncRef();        TimeList &le = tw[(iRealTick + EXPIRE + delay) & TW_MASK];        le.push_back(p);    }}// 时间轮移动void TimerShift(TimeWheel &tw) {    size_t tick = iRealTick;    iRealTick++;    TimeList &le = tw[tick & TW_MASK];    TimeListIter iter = le.begin();    for (; iter != le.end(); iter++) {        CTimerNode *p = *iter;        if (p && p->trySkill()){            delete p;        }    }    le.clear();}

Skynet定时器实现方案

skynet中定时器数据结构

采用时间轮方式,hash表+链表实现,

struct timer_node {  //时间节点 struct timer_node *next;    uint32_t expire; //到期滴答数};struct link_list {  // 链表  struct timer_node head;  struct timer_node *tail;};struct timer { struct link_list near[256];  // 即将到来的定时器    struct link_list t[4][64]; // 相对较遥远的定时器    struct spinlock lock;    uint32_t time;  // 记录当前滴答数    uint64_t starttime;    uint64_t current;    uint64_t current_point;};

其中time32位无符号整数, 记录时间片对应数组near[256] ,表示即将到来的定时任务, t[4][64],表示较为遥远的定时任务。

定时器执行流程

如何实现一个定时器?看这一篇就够了

skynet_time_wheel

t[3] t[2] t[1] t[0] near
26-32位 20-26位 14-20位 8-14位 0-8位
[2^(8+6x3),2^(8+6x4)-1] [2^(8+6x2),2^(8+6x3)-1] [2^(8+6),2^(8+6x2)-1] [2^8,2^(8+6) -1] [0,2^8-1]
  • 首先检查节点的expiretime高24位是否相等,相等则将该节点添加到expire低8位值对应数组near的元素的链表中,不相等则进行下一步

  • 检查expiretime高18位是否相等,相等则将该节点添加到expire低第9位到第14位对应的6位二进制值对应数组t[0]的元素的链表中,否则进行下一步

  • 检查expiretime高12位是否相等,相等则将该节点添加到expire低第15位到第20位对应的6位二进制值对应数组t[1]的元素的链表中,如果不相等则进行下一步

  • 检查expiretime高6位是否相等,相等则将该节点添加到expire低第21位到第26位对应的6位二进制值对应数组t[2]的元素的链表中,如果不相等则进行下一步

  • 将该节点添加到expire低第27位到第32位对应的6位二进制值对应数组t[3]的元素的链表中

以下为具体实现,仅贴出主要接口,具体细节请参考skynet源代码。

定时器初始化

// skynet_start.c// skynet 启动入口voidskynet_start(struct skynet_config * config) {    ...    skynet_timer_init();    ...}// skynet_timer.cvoidskynet_timer_init(void) {    // 创建全局timer结构 TI    TI  = timer_create_timer();    uint32_t current = 0;    systime(&TI->starttime, &current);    TI->current = current;    TI->current_point = gettime();}

添加定时器

通过skynet_server.c中的cmd_timeout调用skynet_timeout添加新的定时器

// TI为全局的定时器指针static struct timer * TI = NULL; int skynet_timeout(uint32_t handle, int time, int session) {    ...    struct timer_event event;    event.handle = handle;  // callback    eveng.session = session;    // 添加新的定时器节点    timer_add(TI, &event, sizeof(event), time);    return session;}// 添加新的定时器节点static void timer_add(struct timer *T, void 8arg, size_t sz, int time) {    // 给timer_node指针分配空间,还需要分配timer_node + timer_event大小的空间,    // 之后通过node + 1可获得timer_event数据    struct timer_node *node = (struct timer_node *)skynet_malloc(sizeof(*node)+sz);    memcpy(node+1,arg,sz);    SPIN_LOCK(T);    node->expire=time+T->time;    add_node(T, node);    SPIN_UNLOCK(T);}// 添加到定时器链表里,如果定时器的到期滴答数跟当前比较近(<2^8),表示即将触发定时器添加到T->near数组里// 否则根据差值大小添加到对应的T->T[i]中static void add_node(struct timer *T, struct timer_node *node) {    ...}

驱动方式

skynet启动时,会创建一个线程专门跑定时器,每帧(0.0025s)调用skynet_updatetime()

// skynet_start.cstatic void * thread_timer(void *p) {    struct monitor * m = p;    skynet_initthread(THREAD_TIMER);    for (;;) {        skynet_updatetime();  // 调用timer_update        skynet_socket_updatetime();        CHECK_ABORT        wakeup(m,m->count-1);        usleep(2500);  // 2500微秒 = 0.0025s        if (SIG) {            signal_hup();            SIG = 0;        }    }    ...}

每个定时器设置一个到期滴答数,与当前系统的滴答数(启动时为0,1滴答1滴答往后跳,1滴答==0.01s ) 比较得到差值interval;

如果interval比较小(0 <= interval <= 2^8-1),表示定时器即将到来,保存在第一部分前2^8个定时器链表中;否则找到属于第二部分对用的层级中。

// skynet_timer.cvoid skynet_updatetime(void) {    ...    uint32_t diff = (uint32_t)(cp - TI->current_point);     TI->current_point = cp;    TI->current += diff;    // diff单位为0.01s    for (i = 0; i < diff; i++){        timer_update(TI);            }}static voidtimer_update(struct timer *T) {    SPIN_LOCK(T);    timer_execute(T); // 检查T->near是否位空,有就处理到期定时器    timer_shift(T);  // 时间片time++,移动高24位的链表    timer_execute(T);    SPIN_UNLOCK(T);}// 每帧从T->near中触发到期得定时器static inline voidtimer_execute(struct timer *T) {  ...}// 遍历处理定时器链表中所有的定时器static inline voiddispatch_list(struct timer_node *current) {    ...}// 将高24位对应的4个6位的数组中的各个元素的链表往低位移static voidtimer_shift(struct timer *T) {    ...}// 将level层的idx位置的定时器链表从当前位置删除,并重新add_nodestatic void move_list(struct timer *T, int level, int idx) {}

最小堆实现定时器

最小堆实现例子:boost.asio采用二叉树,go采用四叉树, libuv

具体实现略。

总结

本文主要介绍定时器作用,实现定时器数据结构选取,并详细介绍了跳表红黑树时间轮实现定时器的思路和方法。

参考

跳表介绍

https://baijiahao.baidu.com/s?id=1633338040568845450&wfr=spider&for=pc

Skynet GitHub

https://github.com/cloudwu/skynet

skynet源码剖析

https://zhongyiqun.gitbooks.io/skynet/content/18-skynetding-shi-qi-yuan-li.html

继续阅读
weinxin
资源分享QQ群
本站是一个IT技术分享社区, 会经常分享资源和教程; 分享的时代, 请别再沉默!
沙海
匿名

发表评论

匿名网友 填写信息

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen: