IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    Redis 中快排算法详解(pqsort.c)

    klion26发表于 2015-07-26 13:20:34
    love 0

    Redis 中的快排中的思想是一篇叫做“Engineering a Sort Function”里面的思想。做到比标准库里面的快排更快,更稳定,(在自己电脑上做测试的时候,基本一样的时间),按照论文来说主要用到下面的优化:

    1. 元素交换的时候更省时间
    2. 在某些情况下使用了冒泡排序,以及插入排序
    3. 选择一个接近中位数的数做 pivot

    下面从上面三个方面入手分析代码

    对于元素交换方面,代码里面的做法首先会查看待排序的数组是否按机器字节对齐,以及每个元素所占的字节长度是否等于机器字节长度,代码如下:

    #define SWAPINIT(a, es) swaptype = ((char *)a - (char *)0) % sizeof(long) || \
    	es % sizeof(long) ? 2 : es == sizeof(long)? 0 : 1;

    注意 || 的优先级比 ?: 的优先级要高,不然这里会理解错误的。这里的 swaptype 就是用来确定交换时的类型的,有如下三种可能:

    1. 数组首地址对齐,且数组元素占的字节数和 sizeof(long) 一样的。swaptype == 0
    2. 数组首地址对齐,数组元素占的字节数是 sizeof(long) 的倍数的。swaptype == 1
    3. 数组首地址不对齐;或者首地址对齐,但是数组元素所占的字节数不能整除 sizeof(long) 的。swaptype == 2

    针对不能的情况,后面交换数据的时候分别有不同的选择。对于第一种和第二种,直接按照 long 类型来交换,其他的则按照 char 类型来交换。

    if (swaptype <= 1) 
        swapcode(long, a, b, n)
    else 
        swapcode(char, a, b, n)

    对于交换,本方法由于把数据分为了小于 pivot 的,大于 pivot 的和等于 pivot 的。因此还有数组的交换,将所有等于 pivot 的都换到中间。

    #define vecswap(a, b, n) if ((n) > 0) swapfunc((a), (b), (size_t)(n), swaptype)

    在进行完一次排序之后,数组内部的分布如下所示:

    | pivot |  等于 pivot 的所有元素A |  小于 pivot 的所有元素  |大于 pivot 的所有元素 | 等于pivot 的所有元素B |

    然后我们需要把小于 pivot 的所有元素换到数组的最左边,把原来数组最右边的等于 pivot 的所有元素B换到数组的中间。变成如下排列

    | 小于 pivot 的所有元素  |  等于 pivot 的所有元素  |  大于 pivot 的所有元素 |

    这里就需要用到上面的 vecswap,就是两段数之间的交换。将所有等于 pivot 的元素放到中间,所有小于 pivot 的元素放到左边,所有大于 pivot 的元素放到右边

    2. 利用冒泡和插入排序。

    在本方法中,对于少于7个元素的都利用冒泡排序解决(7 是一个 Magical number),然后对于接近已排好序的数组,利用插入排序,我们知道快速排序对于已排好序的数组进行排序复杂度是很高的,因此在内部采用了插入排序解决这一问题

    if (swap_cnt == 0) {  /* Switch to insertion sort */
            /** {a[0]}  {a[1]...a[k]}  {a[k+1]...a[n-1]}
             * a[i] < a[0]  for 1<= i < k+1
             * a[i] > a[0]  for k+1 <= i < n
             **/
    		for (pm = (char *) a + es; pm < (char *) a + n * es; pm += es)
    			for (pl = pm; pl > (char *) a && cmp(pl - es, pl) > 0; 
    			     pl -= es)
    				swap(pl, pl - es);
    		return;
    	}

    这段代码中的 swap_cnt 是记录了在第一次排序过程中是否进行过交换,等于零表示没有进行过交换。

    3. 找一个更合适的 pivot 这一点,本方法利用下面的方法来进行 pivot 的寻找,其中 pm 指向最终我们设定的 pivot

    pm = (char *) a + (n / 2) * es; /** 首先用数组中间的那个元素做 pivot **/
    	if (n > 7) { /** 如果元素个数大于 7 **/
    		pl = (char *) a;/* 首元素 */
    		pn = (char *) a + (n - 1) * es; /* 末尾元素 */
    		if (n > 40) { /** 如果大于40个元素(40 也是一个 Magical Number) **/
    			d = (n / 8) * es; /** 利用下面的 9 个数来近似整个数组的中位数 **/
    			pl = med3(pl, pl + d, pl + 2 * d, cmp); /* pl 这三个数里面的一个中位数 */
    			pm = med3(pm - d, pm, pm + d, cmp); /* pm 是这三个数的中位数 */
    			pn = med3(pn - 2 * d, pn - d, pn, cmp); /* pn 是这三个数的中位数 */
    		}
    		pm = med3(pl, pm, pn, cmp); /* pm 是 pl pm pn 的中位数,近似整个数组的中位数 */
    	}

    这样找到的 pivot 不会偏向很严重,从而在快排的时候,不会导致某一边(大于 pivot 和小于 pivot 两边)的数据量比较大。

    另外在对数组的右半部分进行排序的时候,利用了 goto,而不是递归,这样可以节省栈空间。

    我在 Github 上存放了一份带所有注释的代码,地址如下:https://github.com/klion26/redis-1.0-annotation/blob/master/pqsort.c

    参考文章:

    1. http://cs.fit.edu/~pkc/classes/writing/samples/bentley93engineering.pdf

    2. http://blog.csdn.net/guodongxiaren/article/details/45567291

    您可能也喜欢:

    二叉树的非递归遍历

    几个简单的数学题

    Linux命令行和shell脚本编程笔记

    github连接出现port 22:Bad file number问题

    XP和Fedora的双系统之路
    无觅


沪ICP备19023445号-2号
友情链接