redis源码分析之跳跃表

skiplist

Posted by chensong on 2019-04-15 00::05::03

=====================================================

redis源码学习系列文章:

redis源码分析之sha1算法分析

redis源码分析之字典源码分析

redis源码分析之内存编码分析intset, ziplist编码分析

redis源码分析之跳跃表

redis源码分析之内存淘汰策略的原理分析

redis源码分析之对象系统源码分析string, list链表,hash哈希,set集合,zset有序集合

redis源码分析之异步进程保存数据rdb文件和aof文件源码分析

=====================================================

在我的github上会持续更新Redis代码的中文分析,地址送出https://github.com/chensongpoixs/credis_source,共同学习进步

前言

跳跃表数据结构可以 与平衡树和红黑树查询效率。 正常时间复杂度是O(logn), 最差时间复杂度是O(n)

skiplist原理介绍

这样所有新增加的指针连成了一个新的链表,但它包含的节点个数只有原来的一半(上图中是9, 45, 99)。现在当我们想查找数据的时候,可以先沿着这个新链表进行查找。当碰到比待查数据大的节点时,再回到原来的链表中进行查找。比如,我们想查找55,查找的路径是沿着下图中标红的指针所指向的方向进行的:

  1. 55首先和9比较,再和45比较,比它们都大,继续向后比较。

  2. 但55和99比较的时候,比99要小,因此回到下面的链表(原链表),与67比较。

  3. 55比67要小,沿下面的指针继续向前和45比较。55比45大,说明待查数据55在原链表中不存在,而且它的插入位置应该在45和67之间。

正文

1, 数据结构介绍

redis源码中的跳跃表数据结构分为 链表(zskiplist)和节点(zskiplistNode)

/* ZSETs use a specialized version of Skiplists */
typedef struct zskiplistNode 
{
    sds						ele;			/*成员object对象*/
    double					score;			/*分数字段依赖此值对skiplist进行排序*/
    struct zskiplistNode *	backward;		/*插入层中指向上一个元素level数组*/
    struct zskiplistLevel 
	{
        struct zskiplistNode *forward;	/*每层中指向下一个元素指针*/
        unsigned long span;				/*距离下一个元素之间元素数量, 即forward指向的元素*/
    } level[];
} zskiplistNode;

typedef struct zskiplist 
{
    struct zskiplistNode *header, *tail;/*跳跃表头节点和尾节点*/
    unsigned long length;				/*跳跃表中元素个数*/
    int level;							/*跳跃表当前最大层数*/
} zskiplist;

typedef struct zset 
{
    dict *dict;
    zskiplist *zsl;
} zset;

2, 跳跃表的创建流程

创建初始化跳跃表的数据结构

/* Create a new skiplist. */
zskiplist *zslCreate(void) 
{
    int			j;
    zskiplist *·zsl;

    zsl = zmalloc(sizeof(*zsl));
    zsl->level = 1;
    zsl->length = 0;
	/*初始化创建一个头节点, 初始化节点信息*/
    zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
    for (j = 0; j < ZSKIPLIST_MAXLEVEL; ++j)
	{
        zsl->header->level[j].forward = NULL;
        zsl->header->level[j].span = 0;
    }
    zsl->header->backward = NULL;
    zsl->tail = NULL;
    return zsl;
}


//---------
/* Create a skiplist node with the specified number of levels.
 * The SDS string 'ele' is referenced by the node after the call. */
zskiplistNode *zslCreateNode(int level, double score, sds ele) 
{
    zskiplistNode *zn =
        zmalloc(sizeof(*zn) + level * sizeof(struct zskiplistLevel));
    zn->score	=	score;
    zn->ele		=	ele;
    return	zn;
}

3, 插入操作

插入的操作:

  1. 先找到层(level)
  2. 该层是否存在不存在, 就创建该层, 把数据放到该层节点上面

zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) 
{
    // updata[]数组记录每一层位于插入节点的前一个节点
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    // rank[]记录每一层位于插入节点的前一个节点的排名
    //在查找某个节点的过程中,将沿途访问过的所有层的跨度累计起来,得到的结果就是目标节点在跳跃表中的排位
    unsigned int rank[ZSKIPLIST_MAXLEVEL];
    int i, level;

    serverAssert(!isnan(score));
    //表头节点
    x = zsl->header;
	/*从头节点开始搜索, 一层一层向下搜索, 直到直到最后一层, update数组中保存着每层应该插入的位置*/
    for (i = zsl->level - 1; i >= 0; i--) 
	{
		// 注意观察 '=='  节点 [node] ---> 区分层 [level] ---> 是否是下一个节点
        rank[i] = i == (zsl->level - 1) ? 0 : rank[i + 1];  
        while (x->level[i].forward && (x->level[i].forward->score < score ||
                    (x->level[i].forward->score == score &&  sdscmp(x->level[i].forward->ele,   ele) < 0)))
        {
			//记录跨越了多少个节点  
            rank[i] += x->level[i].span;
			//查找下一个节点
            x = x->level[i].forward;
        }
        // 存储当前层上位于插入节点的前一个节点,找下一层的插入节点
        update[i] = x; // _ptr
    }
    
	/* 随机一个层数, 如果随机的层数是新的层数, 则需要给update数组中新的层数赋值*/
    level = zslRandomLevel();// [00-64]
    // 如果level大于当前存储的最大level值
    // 设定rank数组中大于原level层以上的值为0--为什么设置为0
    // 同时设定update数组大于原level层以上的数据
    if (level > zsl->level) 
	{
		//是从增加的层数开始增加的
        for (i = zsl->level; i < level; i++) 
		{
            //因为这一层没有节点,所以重置rank[i]为0
            rank[i] = 0;
            //因为这一层还没有节点,所以节点的前一个节点都是头节点
            update[i] = zsl->header;
            //在未添加新节点之前,需要更新的节点跨越的节点数目自然就是zsl->length---
			//因为整个层只有一个头结点----->言外之意头结点的span都是链表长度
            update[i]->level[i].span = zsl->length;
        }
        // 更新level值(max层数)
        zsl->level = level;
    }
	/*创建新的节点插入到update数组对应的层*/
    x = zslCreateNode(level,score,ele);
    for (i = 0; i < level; i++) 
	{
		// next
        x->level[i].forward = update[i]->level[i].forward;// _ptr
        // 
		update[i]->level[i].forward = x;
		/*
		header                update[i]     x    update[i]->forward
		|-----------|-----------|-----------|-----------|-----------|-----------|
		|<---update[i].span---->|
		|<-------rank[i]------->|
		|<-------------------rank[0]------------------->|

		更新update数组中span值和新插入元素span值, rank[0]存储的是x元素距离头部的距离, rank[i]存储的是update[i]距离头部的距离, 上面给出了示意图
		*/
		//x->level[i].span = 从x到update[i]->forword的span数目, 
		//原来的update[i]->level[i].span = 从update[i]到update[i]->level[i]->forward的span数目 
		//所以x->level[i].span = 原来的update[i]->level[i].span - (rank[0] - rank[i]); 
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
		//对于update[i]->level[i].span值的更新由于在update[i]与update[i]->level[i]->forward之间又添加了x, 
		//update[i]->level[i].span = 从update[i]到x的span数目, 
		//由于update[0]后面肯定是新添加的x,所以自然新的update[i]->level[i].span = (rank[0] - rank[i]) + 1; 

		//提示: update[i]和x[i]之间肯定没有节点了
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }
	/* level可能小zsl->level, 无变动的元素span依次增加1*/
    for (i = level; i < zsl->level; i++) 
	{
        update[i]->level[i].span++;
    }
	/*上一个元素level数组, 重新赋值*/
    x->backward = (update[0] == zsl->header) ? NULL : update[0];
	if (x->level[0].forward)
	{
		x->level[0].forward->backward = x;
	}
	else
	{ 
		/*下一个元素为空,则表示x为尾部元素*/
		zsl->tail = x;
	}
	// 跳跃表长度+1
    zsl->length++;
    return x;
}

4, 删除节点操作


/* Delete an element with matching score/element from the skiplist.
 * The function returns 1 if the node was found and deleted, otherwise
 * 0 is returned.
 *
 * If 'node' is NULL the deleted node is freed by zslFreeNode(), otherwise
 * it is not freed (but just unlinked) and *node is set to the node pointer,
 * so that it is possible for the caller to reuse the node (including the
 * referenced SDS string at node->ele). */
int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    int i;

    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward &&
                (x->level[i].forward->score < score ||
                    (x->level[i].forward->score == score &&
                     sdscmp(x->level[i].forward->ele,ele) < 0)))
        {
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    /* We may have multiple elements with the same score, what we need
     * is to find the element with both the right score and object. */
    x = x->level[0].forward;
    if (x && score == x->score && sdscmp(x->ele,ele) == 0) {
        zslDeleteNode(zsl, x, update);
        if (!node)
            zslFreeNode(x);
        else
            *node = x;
        return 1;
    }
    return 0; /* not found */
}

zset-max-ziplist-entries 128
zset-max-ziplist-value 64

这个配置的意思是说,在如下两个条件之一满足的时候,ziplist会转成zset(具体的触发条件参见t_zset.c中的zaddGenericCommand相关代码):

  1. 当sorted set中的元素个数,即(数据, score)对的数目超过128的时候,也就是ziplist数据项超过256的时候。
  2. 当sorted set中插入的任意一个数据的长度超过了64的时候。

结语