前言
为充分利用向量空间,克服”假溢出”现象的方法是:将向量空间想象为一个首尾相接的圆环,并称这种向量为循环向量。存储在其中的队列称为循环队列(Circular Queue)。这种循环队列可以以单链表的方式来在实际编程应用中来实现。
正文
一, 无锁循环队列的条件处理
循环队列中,由于入队时尾指针向前追赶头指针;出队时头指针向前追赶尾指针,造成队空和队满时头尾指针均相等。 因此,无法通过条件m_head == m_tail来判别队列是”空”还是”满”。
解决这个问题的方法至少有两种:
1, 另设一布尔变量以区别队列的空和满;
2, 另一种方式就是数据结构常用的: 队满时:(m_tail+1) % m_maxSize == m_head,m_maxSize为队列长度(所用数组大小),由于m_tail,m_head均为所用空间的指针,循环只是逻辑上的循环,所以需要求余运算。
当队列的大小为 m_numsize = 6;
“队已满”, 但是m_tail(5)+1=6!= m_head(0),对空间长度求余,作用就在此6%6=0=m_head(0)。
“队列满”时 当 index == (m_maxSize) 到最大时就要 下标重置到0
类型定义采用环状模型来实现队列,各数据成员的意义如下:
- m_head指定队首位置,删除一个元素就将m_head顺时针移动一位;
- m_tail指向元素要插入的位置,插入一个元素就将m_tail顺时针移动一位;
- m_size存放队列中元素的个数,当m_size等于m_maxSize时,不可再向队列中插入元素。
m_size=0
m_size = m_maxSize
T* m_datas;
int m_maxSize; ////应根据具体情况定义该值队列的大小
volatile int m_size; ////计数器,记录队中元素总数
volatile int m_head; ////头指针,队非空时指向队头元素
volatile int m_tail; ////尾指针,队非空时指向队尾元素的下一位置
实现
template<typename T>
class cycle_queue
{
//-- constructor/destructor
public:
cycle_queue();
~cycle_queue();
//-- member function
public:
//-- prop
bool init(int uMaxNum);
void destroy();
bool empty() const { return (0 == m_size); }
bool full() const { return (m_size == m_maxSize); }
int size() const { return m_size; }
int get_max_size() const { return m_maxSize; }
bool push(const T& data);
bool pop(T& data);
void clear();
private:
cycle_queue(const wcycle_queue&);
cycle_queue& operator=(const wcycle_queue&);
private:
T* m_datas;
int m_maxSize;
int m_size;
int m_head;
int m_tail;
};
template<typename T>
cycle_queue<T>::cycle_queue() : m_datas(NULL), m_maxSize(0), m_size(0), m_head(0), m_tail(0)
{
}
template<typename T>
cycle_queue<T>::~cycle_queue()
{
destroy();
}
template<typename T>
void cycle_queue<T>::destroy()
{
if (m_datas)
{
delete[] m_datas;
m_datas = NULL;
}
m_maxSize = 0;
m_head = 0;
m_tail = 0;
}
template<typename T>
bool cycle_queue<T>::init(int uMaxNum)
{
m_datas = new T[uMaxNum];
if (!m_datas)
{
return false;
}
m_maxSize = uMaxNum;
return true;
}
template<typename T>
bool cycle_queue<T>::push(const T& data)
{
if (!full())
{
m_datas[m_tail] = data;
m_tail = (m_tail + 1) % m_maxSize;
++m_size;
return true;
}
return false;
}
template<typename T>
bool cycle_queue<T>::pop(T& data)
{
if (!empty())
{
data = m_datas[m_head];
m_head = (m_head + 1) % m_maxSize;
--m_size;
return true;
}
return false;
}
template<typename T>
void cycle_queue<T>::clear()
{
m_size = 0;
m_head = 0;
m_tail = 0;
}
二, volatile 关键字的使用
int b=i;
volatile 指出 i是随时可能发生变化的,每次使用它的时候必须从i的地址中读取,因而编译器生成的汇编代码会重新从i的地址读取数据放在b中。而优化做法是,由于编译器发现两次从i读数据的代码之间的代码没有对i进行过操作,它会自动把上次读的数据放在b中。而不是重新从i里面读。这样一来,如果i是一个寄存器变量或者表示一个端口数据就容易出错,所以说volatile可以保证对特殊地址的稳定访问。
三, 定时器的不同实现及优缺点
“如果连续30s没有请求包(例如登录,消息,keepalive包),服务端就要将这个用户的状态置为离线”。
其中,单机TCP同时在线量约在10w级别,keepalive请求包大概30s一次,吞吐量约在3000qps。
一般来说怎么实现这类需求呢?
“轮询扫描法”
- 用一个std::map<uid, last_packet_time>来记录每一个uid最近一次请求时间last_packet_time
- 当某个用户uid有请求包来到,实时更新这个Map
- 启动一个timer,当map中不为空时,轮询扫描这个map,看每个uid的last_packet_time是否超过30s,如果超过则进行超时处理
“多timer触发法”
- 用一个Map<uid, last_packet_time>来记录每一个uid最近一次请求时间last_packet_time
- 当某个用户uid有请求包来到,实时更新这个Map,并同时对这个uid请求包启动一个timer,30s之后触发
- 每个uid请求包对应的timer触发后,看Map中,查看这个uid的last_packet_time是否超过30s,如果超过则进行超时处理
方案一:只启动一个timer,但需要轮询,效率较低
方案二:不需要轮询,但每个请求包要启动一个timer,比较耗资源
特别在同时在线量很大时,很容易CPU100%,如何高效维护和触发大量的定时/超时任务,是本文要讨论的问题。
二、环形队列法
废话不多说,三个重要的数据结构:
- 30s超时,就创建一个index从0到30的环形队列(本质是个数组)
- 环上每一个slot是一个std::set
,任务集合 - 同时还有一个std::map<uid, index>,记录uid落在环上的哪个slot里
同时:
- 启动一个timer,每隔1s,在上述环形队列中移动一格,0->1->2->3…->29->30->0…
- 有一个Current Index指针来标识刚检测过的slot
当有某用户uid有请求包到达时:
- 从Map结构中,查找出这个uid存储在哪一个slot里
- 从这个slot的Set结构中,删除这个uid
- 将uid重新加入到新的slot中,具体是哪一个slot呢 => Current Index指针所指向的上一个slot,因为这个slot,会被timer在30s之后扫描到
- 更新map,这个uid对应slot的index值
哪些元素会被超时掉呢?
Current Index每秒种移动一个slot,这个slot对应的std::set
所以,当没有超时时,Current Index扫到的每一个slot的Set中应该都没有元素。
优势:
- 只需要1个timer
- timer每1s只需要一次触发,消耗CPU很低
- 批量超时,Current Index扫到的slot,Set中所有元素都应该被超时掉
结语
无锁循环队列的应用中有一个重要的算法, 就是判断是否到了最后一个下标的修改下标到第一个
m_head = (m_head + 1) % m_maxsize;