无锁循环队列的应用

循环链表结构

Posted by chensong on 2018-12-22 10::48::14

前言

为充分利用向量空间,克服”假溢出”现象的方法是:将向量空间想象为一个首尾相接的圆环,并称这种向量为循环向量。存储在其中的队列称为循环队列(Circular Queue)。这种循环队列可以以单链表的方式来在实际编程应用中来实现。

正文

一, 无锁循环队列的条件处理

循环队列中,由于入队时尾指针向前追赶头指针;出队时头指针向前追赶尾指针,造成队空和队满时头尾指针均相等。 因此,无法通过条件m_head == m_tail来判别队列是”空”还是”满”。

解决这个问题的方法至少有两种:

1, 另设一布尔变量以区别队列的空和满;

2, 另一种方式就是数据结构常用的: 队满时:(m_tail+1) % m_maxSize == m_head,m_maxSize为队列长度(所用数组大小),由于m_tail,m_head均为所用空间的指针,循环只是逻辑上的循环,所以需要求余运算。

当队列的大小为 m_numsize = 6;

“队已满”, 但是m_tail(5)+1=6!= m_head(0),对空间长度求余,作用就在此6%6=0=m_head(0)。

“队列满”时 当 index == (m_maxSize) 到最大时就要 下标重置到0

类型定义采用环状模型来实现队列,各数据成员的意义如下:

  1. m_head指定队首位置,删除一个元素就将m_head顺时针移动一位;
  2. m_tail指向元素要插入的位置,插入一个元素就将m_tail顺时针移动一位;
  3. m_size存放队列中元素的个数,当m_size等于m_maxSize时,不可再向队列中插入元素。

m_size=0

m_size = m_maxSize

T*	m_datas;
int	m_maxSize;  ////应根据具体情况定义该值队列的大小
volatile int    m_size;  ////计数器,记录队中元素总数
volatile int	m_head; ////头指针,队非空时指向队头元素
volatile int	m_tail; ////尾指针,队非空时指向队尾元素的下一位置

实现

template<typename T>
class cycle_queue
{
	//-- constructor/destructor
public:
	cycle_queue();
	~cycle_queue();

	//-- member function
public:
	//-- prop
	bool init(int uMaxNum);
	void destroy();

	bool empty() const { return (0 == m_size); }
	bool full() const { return (m_size == m_maxSize); }
	int	size() const { return m_size; }
	int	get_max_size() const { return m_maxSize; }

	bool push(const T& data);
	bool pop(T& data);
	void clear();

private:
	cycle_queue(const wcycle_queue&);
	cycle_queue& operator=(const wcycle_queue&);

private:
	T*	            m_datas;
	int	            m_maxSize;
	int    			m_size;
	int				m_head;
	int				m_tail;
};

template<typename T>
cycle_queue<T>::cycle_queue() : m_datas(NULL), m_maxSize(0), m_size(0), m_head(0), m_tail(0)
{

}

template<typename T>
cycle_queue<T>::~cycle_queue()
{
	destroy();
}

template<typename T>
void cycle_queue<T>::destroy()
{
	if (m_datas)
	{
		delete[] m_datas;
		m_datas = NULL;
	}
	m_maxSize = 0;
	m_head = 0;
	m_tail = 0;
}

template<typename T>
bool cycle_queue<T>::init(int uMaxNum)
{
	m_datas = new T[uMaxNum];
	if (!m_datas)
	{
		return false;
	}
	m_maxSize = uMaxNum;
	return true;
}

template<typename T>
bool cycle_queue<T>::push(const T& data)
{
	if (!full())
	{
		m_datas[m_tail] = data;
		m_tail = (m_tail + 1) % m_maxSize;
		++m_size;
		return true;
	}
	return false;
}

template<typename T>
bool cycle_queue<T>::pop(T& data)
{
	if (!empty())
	{
		data = m_datas[m_head];
		m_head = (m_head + 1) % m_maxSize;
		--m_size;
		return true;
	}
	return false;
}

template<typename T>
void cycle_queue<T>::clear()
{
	m_size = 0;
	m_head = 0;
	m_tail = 0;
}

二, volatile 关键字的使用

int b=i;

volatile 指出 i是随时可能发生变化的,每次使用它的时候必须从i的地址中读取,因而编译器生成的汇编代码会重新从i的地址读取数据放在b中。而优化做法是,由于编译器发现两次从i读数据的代码之间的代码没有对i进行过操作,它会自动把上次读的数据放在b中。而不是重新从i里面读。这样一来,如果i是一个寄存器变量或者表示一个端口数据就容易出错,所以说volatile可以保证对特殊地址的稳定访问。

三, 定时器的不同实现及优缺点

“如果连续30s没有请求包(例如登录,消息,keepalive包),服务端就要将这个用户的状态置为离线”。

其中,单机TCP同时在线量约在10w级别,keepalive请求包大概30s一次,吞吐量约在3000qps。

一般来说怎么实现这类需求呢?

“轮询扫描法”

  1. 用一个std::map<uid, last_packet_time>来记录每一个uid最近一次请求时间last_packet_time
  2. 当某个用户uid有请求包来到,实时更新这个Map
  3. 启动一个timer,当map中不为空时,轮询扫描这个map,看每个uid的last_packet_time是否超过30s,如果超过则进行超时处理

“多timer触发法”

  1. 用一个Map<uid, last_packet_time>来记录每一个uid最近一次请求时间last_packet_time
  2. 当某个用户uid有请求包来到,实时更新这个Map,并同时对这个uid请求包启动一个timer,30s之后触发
  3. 每个uid请求包对应的timer触发后,看Map中,查看这个uid的last_packet_time是否超过30s,如果超过则进行超时处理

方案一:只启动一个timer,但需要轮询,效率较低

方案二:不需要轮询,但每个请求包要启动一个timer,比较耗资源

特别在同时在线量很大时,很容易CPU100%,如何高效维护和触发大量的定时/超时任务,是本文要讨论的问题。

二、环形队列法

废话不多说,三个重要的数据结构:

  1. 30s超时,就创建一个index从0到30的环形队列(本质是个数组)
  2. 环上每一个slot是一个std::set,任务集合
  3. 同时还有一个std::map<uid, index>,记录uid落在环上的哪个slot里

同时:

  1. 启动一个timer,每隔1s,在上述环形队列中移动一格,0->1->2->3…->29->30->0…
  2. 有一个Current Index指针来标识刚检测过的slot

当有某用户uid有请求包到达时:

  1. 从Map结构中,查找出这个uid存储在哪一个slot里
  2. 从这个slot的Set结构中,删除这个uid
  3. 将uid重新加入到新的slot中,具体是哪一个slot呢 => Current Index指针所指向的上一个slot,因为这个slot,会被timer在30s之后扫描到
  4. 更新map,这个uid对应slot的index值

哪些元素会被超时掉呢?

Current Index每秒种移动一个slot,这个slot对应的std::set中所有uid都应该被集体超时!如果最近30s有请求包来到,一定被放到Current Index的前一个slot了,Current Index所在的slot对应Set中所有元素,都是最近30s没有请求包来到的。

所以,当没有超时时,Current Index扫到的每一个slot的Set中应该都没有元素。

优势:

  1. 只需要1个timer
  2. timer每1s只需要一次触发,消耗CPU很低
  3. 批量超时,Current Index扫到的slot,Set中所有元素都应该被超时掉

结语

无锁循环队列的应用中有一个重要的算法, 就是判断是否到了最后一个下标的修改下标到第一个

m_head  = (m_head + 1) % m_maxsize;