redis源码分析之bitops数据使用场景

缓存穿透,缓存击穿,缓存雪崩,统计数量汉明宽度

Posted by chensong on 2020-09-29 22::22::24

=====================================================

redis源码学习系列文章:

redis源码分析之sha1算法分析

redis源码分析之字典源码分析

redis源码分析之内存编码分析intset, ziplist编码分析

redis源码分析之跳跃表

redis源码分析之内存淘汰策略的原理分析

redis源码分析之对象系统源码分析string, list链表,hash哈希,set集合,zset有序集合

redis源码分析之异步进程保存数据rdb文件和aof文件源码分析

redis源码之sentinel高可用架构分析

redis源码分析之集群之一的槽的分配算法crc16原理分析

=====================================================

在我的github上会持续更新Redis代码的中文分析,地址送出https://github.com/chensongpoixs/credis_source,共同学习进步

前言

redis中bitops的数据结构在 在这个数据结构中说存在时有可能存在也有可能不存在, 但是说不存在时就一定不存在的所以在高并发中使用的缓存穿透,缓存击穿,缓存雪崩

正文

一, setbit设置方法

一个字节等于八个bit 2^3


/* SETBIT key offset bitvalue */
void setbitCommand(client *c) {
    robj *o;
    char *err = "bit is not an integer or out of range";
    size_t bitoffset;
    ssize_t byte, bit;
    int byteval, bitval;
    long on;
    // 得到偏移量 offset的值
    if (getBitOffsetFromArgument(c,c->argv[2],&bitoffset,0,0) != C_OK)
        return;

    if (getLongFromObjectOrReply(c,c->argv[3],&on,err) != C_OK)
        return;

    /* Bits can only be set or cleared... */
    //只能设置一个比特位  on不是退出
    if (on & ~1) {
        addReplyError(c,err);
        return;
    }

    if ((o = lookupStringForBitCommand(c,bitoffset)) == NULL) return;

    /* Get current values */
    // 这里>> 3就相当于 除以 8 === > [2^3 = 8]
    byte = bitoffset >> 3;     //  例子: 53   --> 0011 0101 => 0000 0011
    byteval = ((uint8_t*)o->ptr)[byte];
    bit = 7 - (bitoffset & 0x7); // 底的3位拿到 -> 
    bitval = byteval & (1 << bit); // 0001 ====> 1000, 0100, 0010

    /* Update byte with new bit value and return original value */
    byteval &= ~(1 << bit);
    byteval |= ((on & 0x1) << bit);
    ((uint8_t*)o->ptr)[byte] = byteval;
    signalModifiedKey(c->db,c->argv[1]);
    notifyKeyspaceEvent(NOTIFY_STRING,"setbit",c->argv[1],c->db->id);
    server.dirty++;
    addReply(c, bitval ? shared.cone : shared.czero);
}

二, getbit方法

/* GETBIT key offset */
void getbitCommand(client *c) {
    robj *o;
    char llbuf[32];
    size_t bitoffset;
    size_t byte, bit;
    size_t bitval = 0;

    if (getBitOffsetFromArgument(c,c->argv[2],&bitoffset,0,0) != C_OK)
        return;

    if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
        checkType(c,o,OBJ_STRING)) return;

    byte = bitoffset >> 3;
    bit = 7 - (bitoffset & 0x7);
    if (sdsEncodedObject(o)) {
        if (byte < sdslen(o->ptr))
            bitval = ((uint8_t*)o->ptr)[byte] & (1 << bit);
    } else {
        if (byte < (size_t)ll2string(llbuf,sizeof(llbuf),(long)o->ptr))
            bitval = llbuf[byte] & (1 << bit);
    }

    addReply(c, bitval ? shared.cone : shared.czero);
}

三, bitfield方法

四, bitop

五, bitcount

这里redis使用两种方法进行统计二进制中有多少1

  1. 查表法
  2. 汉明宽度(分而治之的思想)

这里主要说明一下汉明宽度计算思想

//汉明宽度计算的思路是使用分而治之的思想处理的

这里是先计算二进制下标[0,1],[2, 3][4,5],[6, 7][8,9]… 上的位置”1”的个数所以二个bit就可以存放了

typedef unsigned int UINT32;
const UINT32 m1  = 0x55555555;  // 01010101010101010101010101010101
const UINT32 m2  = 0x33333333;  // 00110011001100110011001100110011
const UINT32 m4  = 0x0f0f0f0f;  // 00001111000011110000111100001111
const UINT32 m8  = 0x00ff00ff;  // 00000000111111110000000011111111
const UINT32 m16 = 0x0000ffff;  // 00000000000000001111111111111111
const UINT32 h01 = 0x01010101;  // the sum of 256 to the power of 0, 1, 2, 3

/* This is a naive implementation, shown for comparison, and to help in 
* understanding the better functions. It uses 20 arithmetic operations
* (shift, add, and). */
int popcount(UINT32 x)
{
  x = (x & m1) + ((x >> 1) & m1);
  x = (x & m2) + ((x >> 2) & m2);
  x = (x & m4) + ((x >> 4) & m4);
  x = (x & m8) + ((x >> 8) & m8);
  x = (x & m16) + ((x >> 16) & m16);
  return x;
}
 例子:  先一个bit为一个块, 再以4个bit为块的处理 以此类推  
				   0110 0010 
0011 0011  [公式 = ((x & 0011 0011) + ((x >> 1) & 0011 0011)) ]  => 0010 0001 
0000 1111  [公式 = ((x & 0000 1111) + ((x >> 4) & 0000 1111)) ]  => 0000 0011 
/* Count number of bits set in the binary array pointed by 's' and long
 * 'count' bytes. The implementation of this function is required to
 * work with a input string length up to 512 MB. */
size_t redisPopcount(void *s, long count) {
    size_t bits = 0;
    unsigned char *p = s;
    uint32_t *p4;
    /**
     * 这边8个比特位对应"1"个数罗列出来了  char = > 256   使用空间换时间的做法
     */
    static const unsigned char bitsinbyte[256] = {0,1,1,2,1,2,2,3,1,2,2,3,2,3,3,4,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,4,5,5,6,5,6,6,7,5,6,6,7,6,7,7,8};

    /* Count initial bytes not aligned to 32 bit. */
    /// 这边有一个技巧  就是 & 0X3  低2位就进行查表, 低的2位不是0就走下面的的汉明重量 或者直接一个字节的查表
    while((unsigned long)p & 3 && count) {
        bits += bitsinbyte[*p++];
        count--;
    }

    /* Count bits 28 bytes at a time */
    //汉明宽度计算的思路是使用分而治之的思想处理的  
    /**
     *  例子:  先一个bit为一个块, 再以4个bit为块的处理 以此类推  
     *                   0110 0010 
     *   0011 0011  [公式 = ((x & 0011 0011) + ((x >> 1) & 0011 0011)) ]  => 0010 0001 
     *   0000 1111  [公式 = ((x & 0000 1111) + ((x >> 4) & 0000 1111)) ]  => 0000 0011 
     */
    // 汉明宽度处理是28个字节一处理比查表的28次要快 充分利用CPU
    p4 = (uint32_t*)p;
    while(count>=28) 
    {
        uint32_t aux1, aux2, aux3, aux4, aux5, aux6, aux7;

        aux1 = *p4++;
        aux2 = *p4++;
        aux3 = *p4++;
        aux4 = *p4++;
        aux5 = *p4++;
        aux6 = *p4++;
        aux7 = *p4++;
        count -= 28;
        // 0101 0101 0101 0101 0101 0101 0101 0101 ==> 0x55555555
        // 0011 0011 0011 0011 0011 0011 0011 0011 ==> 0x33333333
        // 0000 1111 0000 1111 0000 1111 0000 1111 ==> 0x0F0F0F0F
        // 0x01010101
        aux1 = aux1 - ((aux1 >> 1) & 0x55555555);                   // 0101 0101 0101 0101 0101 0101 0101 0101 ==> 0x55555555
        aux1 = (aux1 & 0x33333333) + ((aux1 >> 2) & 0x33333333);    // 0011 0011 0011 0011 0011 0011 0011 0011 ==> 0x33333333
        aux2 = aux2 - ((aux2 >> 1) & 0x55555555);
        aux2 = (aux2 & 0x33333333) + ((aux2 >> 2) & 0x33333333);
        aux3 = aux3 - ((aux3 >> 1) & 0x55555555);
        aux3 = (aux3 & 0x33333333) + ((aux3 >> 2) & 0x33333333);
        aux4 = aux4 - ((aux4 >> 1) & 0x55555555);
        aux4 = (aux4 & 0x33333333) + ((aux4 >> 2) & 0x33333333);
        aux5 = aux5 - ((aux5 >> 1) & 0x55555555);
        aux5 = (aux5 & 0x33333333) + ((aux5 >> 2) & 0x33333333);
        aux6 = aux6 - ((aux6 >> 1) & 0x55555555);
        aux6 = (aux6 & 0x33333333) + ((aux6 >> 2) & 0x33333333);
        aux7 = aux7 - ((aux7 >> 1) & 0x55555555);
        aux7 = (aux7 & 0x33333333) + ((aux7 >> 2) & 0x33333333);
        // 0000 1111 0000 1111 0000 1111 0000 1111 ==> 0x0F0F0F0F
        // 0000 0000 1111 1111 0000 0000 1111 1111 ==> 0x01010101
        // y = ( x + (x >> 4))
      
        bits += ((((aux1 + (aux1 >> 4)) & 0x0F0F0F0F) +  ((aux2 + (aux2 >> 4)) & 0x0F0F0F0F) + ((aux3 + (aux3 >> 4)) & 0x0F0F0F0F) + ((aux4 + (aux4 >> 4)) & 0x0F0F0F0F) + ((aux5 + (aux5 >> 4)) & 0x0F0F0F0F) + ((aux6 + (aux6 >> 4)) & 0x0F0F0F0F) + ((aux7 + (aux7 >> 4)) & 0x0F0F0F0F))* 0x01010101) >> 24; 
    }
    /* Count the remaining bytes. */
    p = (unsigned char*)p4;
    while(count--) 
    {
        bits += bitsinbyte[*p++];
    }
    return bits;
}

/* BITCOUNT key [start end] */
void bitcountCommand(client *c) {
    robj *o;
    long start, end, strlen;
    unsigned char *p;
    char llbuf[LONG_STR_SIZE];

    /* Lookup, check for type, and return 0 for non existing keys. */
    if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
        checkType(c,o,OBJ_STRING)) return;
    p = getObjectReadOnlyString(o,&strlen,llbuf);

    /* Parse start/end range if any. */
    if (c->argc == 4) {
        if (getLongFromObjectOrReply(c,c->argv[2],&start,NULL) != C_OK)
            return;
        if (getLongFromObjectOrReply(c,c->argv[3],&end,NULL) != C_OK)
            return;
        /* Convert negative indexes */
        if (start < 0 && end < 0 && start > end) {
            addReply(c,shared.czero);
            return;
        }
        if (start < 0) start = strlen+start;
        if (end < 0) end = strlen+end;
        if (start < 0) start = 0;
        if (end < 0) end = 0;
        if (end >= strlen) end = strlen-1;
    } else if (c->argc == 2) {
        /* The whole string. */
        start = 0;
        end = strlen-1;
    } else {
        /* Syntax error. */
        addReply(c,shared.syntaxerr);
        return;
    }

    /* Precondition: end >= 0 && end < strlen, so the only condition where
     * zero can be returned is: start > end. */
    if (start > end) {
        addReply(c,shared.czero);
    } else {
        long bytes = end-start+1;

        addReplyLongLong(c,redisPopcount(p+start,bytes));
    }
}

六, bitops

结语