前言
libevent线程是不安全的, 现在服务器都是多核的如何充分使用cpu就要使用多线程。
正文
一, 谈谈libevent如何使用多线程呢
我们在上学的时候, 经常去学校的食堂吃饭。 有几种情况
1, 一个窗口排队等待打饭
一个窗口就相当于我们网络中io单线程的处理, 不会出现惊群效应(linux 3.0+ epoll这个问题已经解决了) ,这个效率会比较底下, 学生排队的时间过长
2, 多个窗口排队等待打饭
多个窗口排队等待打饭, 一个窗口就相当于我们一个线程, 处理的速度就会比较快的, 效率较高。
二, 谈谈libevent如何多线程中通信的
libevent使用两种方式通信
- 管道 (pipe)
- socket
1,我们一起看看怎么使用socket 通信的
evutil_socketpair 创建socket 事件通知int (*notify)(struct event_base *) = evthread_notify_base_default; // io 线程的操作回调函数 base->th_notify_fn = notify;
evthread_notify_base_default这个函数就通知事件写入1个字节数据
接收线程 void (*cb)(evutil_socket_t, short, void *) = evthread_notify_drain_default; // main thread 回调函数
放到事件队列中去了
evthread_notify_drain_default函数就修改变量is_notify_pending
2, 唤醒线程的流程
① event_add_internal
if (base->current_event == ev && (ev->ev_events & EV_SIGNAL)
&& !EVBASE_IN_THREAD(base))
{
++base->current_event_waiters;
// 等待 通知工作
EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock);
}
② event_del_internal
if (base->current_event == ev && !EVBASE_IN_THREAD(base))
{
++base->current_event_waiters; // wait
// 等待 通知工作
EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock);
}
③ event_active_nolock
if (base->current_event == ev && !EVBASE_IN_THREAD(base))
{
++base->current_event_waiters;
// 等待 通知工作
EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock);
}
2. 广播的函数
① event_process_active_single_queue
for (ev = TAILQ_FIRST(activeq); ev; ev = TAILQ_FIRST(activeq))
{
if (ev->ev_events & EV_PERSIST)
{
event_queue_remove(base, ev, EVLIST_ACTIVE);
}
else
{
event_del_internal(ev);
}
if (!(ev->ev_flags & EVLIST_INTERNAL))
{
++count;
}
event_debug((
"event_process_active: event: %p, %s%scall %p",
ev,
ev->ev_res & EV_READ ? "EV_READ " : " ",
ev->ev_res & EV_WRITE ? "EV_WRITE " : " ",
ev->ev_callback));
#ifndef _EVENT_DISABLE_THREAD_SUPPORT
base->current_event = ev;
base->current_event_waiters = 0;
#endif
switch (ev->ev_closure)
{
case EV_CLOSURE_SIGNAL:
event_signal_closure(base, ev);
break;
case EV_CLOSURE_PERSIST:
event_persist_closure(base, ev);
break;
default:
case EV_CLOSURE_NONE:
EVBASE_RELEASE_LOCK(base, th_base_lock);
(*ev->ev_callback)(
(int)ev->ev_fd, ev->ev_res, ev->ev_arg); // 调用业务的回调函数
break;
}
EVBASE_ACQUIRE_LOCK(base, th_base_lock);
#ifndef _EVENT_DISABLE_THREAD_SUPPORT
base->current_event = NULL;
if (base->current_event_waiters)
{
base->current_event_waiters = 0;
// 通知 广播 main all thread wait事件 起来工作了 ^_^
EVTHREAD_COND_BROADCAST(base->current_event_cond);
}
#endif
if (base->event_break)
{
return -1;
}
}
三 , 多线程
上面的等待是否是多线程每个线程独有event_base的都启动一个线程
#include <string.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <string.h>
#include <errno.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <errno.h>
#include <event2/event.h>
#include <stdlib.h>
#include <pthread.h>
#include <fcntl.h>
#define ERR_EXIT(m) \
do\
{ \
perror(m); \
exit(EXIT_FAILURE); \
} while(0)\
void send_fd(int sock_fd, int send_fd)
{
int ret;
struct msghdr msg;
struct cmsghdr *p_cmsg;
struct iovec vec;
char cmsgbuf[CMSG_SPACE(sizeof(send_fd))];
int *p_fds;
char sendchar = 0;
msg.msg_control = cmsgbuf;
msg.msg_controllen = sizeof(cmsgbuf);
p_cmsg = CMSG_FIRSTHDR(&msg);
p_cmsg->cmsg_level = SOL_SOCKET;
p_cmsg->cmsg_type = SCM_RIGHTS;
p_cmsg->cmsg_len = CMSG_LEN(sizeof(send_fd));
p_fds = (int *)CMSG_DATA(p_cmsg);
*p_fds = send_fd; // 通过传递辅助数据的方式传递文件描述符
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = &vec;
msg.msg_iovlen = 1; //主要目的不是传递数据,故只传1个字符
msg.msg_flags = 0;
vec.iov_base = &sendchar;
vec.iov_len = sizeof(sendchar);
ret = sendmsg(sock_fd, &msg, 0);
if (ret != 1)
{
ERR_EXIT("sendmsg");
}
}
int recv_fd(const int sock_fd)
{
int ret;
struct msghdr msg;
char recvchar;
struct iovec vec;
int recv_fd;
char cmsgbuf[CMSG_SPACE(sizeof(recv_fd))];
struct cmsghdr *p_cmsg;
int *p_fd;
vec.iov_base = &recvchar;
vec.iov_len = sizeof(recvchar);
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = &vec;
msg.msg_iovlen = 1;
msg.msg_control = cmsgbuf;
msg.msg_controllen = sizeof(cmsgbuf);
msg.msg_flags = 0;
p_fd = (int *)CMSG_DATA(CMSG_FIRSTHDR(&msg));
*p_fd = -1;
ret = recvmsg(sock_fd, &msg, 0);
if (ret != 1)
{
ERR_EXIT("recvmsg");
}
p_cmsg = CMSG_FIRSTHDR(&msg);
if (p_cmsg == NULL)
{
ERR_EXIT("no passed fd");
}
p_fd = (int *)CMSG_DATA(p_cmsg);
recv_fd = *p_fd;
if (recv_fd == -1)
{
ERR_EXIT("no passed fd");
}
return recv_fd;
}
//-------------------------------------------------
typedef struct {
pthread_t tid;
struct event_base *base;
struct event event;
int read_fd;
int write_fd;
}LIBEVENT_THREAD;
typedef struct {
pthread_t tid;
struct event_base *base;
}DISPATCHER_THREAD;
const int thread_num = 10;
LIBEVENT_THREAD *threads;
DISPATCHER_THREAD dispatcher_thread;
int last_thread = 0;
//-------------------------------------------------
unsigned short nPort = 9000;
struct event_base *pEventMgr = NULL;
void Reader(int sock, short event, void* arg)
{
fprintf(stderr, "reader ---------------%d\n", sock);
char buffer[1024];
memset(buffer, 0, sizeof(buffer));
int ret = -1;
fprintf(stderr, "1 ---------------%d\n", sock);
ret = recv(sock, buffer, sizeof(buffer), 0);
fprintf(stderr, "2 ---------------%d\n", sock);
if (-1 == ret || 0 == ret)
{
printf("recv error:%s\n", strerror(errno));
return;
}
printf("recv data:%s\n", buffer);
//strcat(buffer,", hello,client\n");
//send(sock,buffer,strlen(buffer),0);
return;
}
static void thread_libevent_process(int fd, short which, void *arg)
{
int ret;
char buf[128];
LIBEVENT_THREAD* me = (LIBEVENT_THREAD*)arg;
int socket_fd = recv_fd(me->read_fd);
struct event *pReadEvent = NULL;
pReadEvent = (struct event *)malloc(sizeof(struct event));
event_assign(pReadEvent, me->base, socket_fd, EV_READ | EV_PERSIST, Reader, NULL);
event_add(pReadEvent, NULL);
return;
}
static void * worker_thread(void *arg)
{
LIBEVENT_THREAD* me = (LIBEVENT_THREAD*)arg;
me->tid = pthread_self();
event_base_loop(me->base, 0);
return NULL;
}
void ListenAccept(int sock, short event, void* arg)
{
printf("ListenAccept ................\n");
// 1,读 --也就是accept
struct sockaddr_in ClientAddr;
int nClientSocket = -1;
socklen_t ClientLen = sizeof(ClientAddr);
printf("---------------------------1\n");
nClientSocket = accept(sock, (struct sockaddr *)&ClientAddr, &ClientLen);
printf("---------------------------2,%d\n", nClientSocket);
if (-1 == nClientSocket)
{
printf("accet error:%s\n", strerror(errno));
return;
}
fprintf(stderr, "a client connect to server ....\n");
//进行数据分发
int tid = (last_thread + 1) % thread_num; //memcached中线程负载均衡算法
LIBEVENT_THREAD *thread = threads + tid;
last_thread = tid;
send_fd(thread->write_fd, nClientSocket);
return;
}
int main(int argc, char *argv[])
{
int nSocket = -1;
int nRet = -1;
nSocket = socket(PF_INET, SOCK_STREAM, 0);
if (-1 == nSocket) //
{
printf("socket error:%s\n", strerror(errno));
return -1;
}
int value = 1;
setsockopt(nSocket, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(value));
struct sockaddr_in ServerAddr;
ServerAddr.sin_family = PF_INET;
ServerAddr.sin_port = htons(nPort);
ServerAddr.sin_addr.s_addr = htonl(INADDR_ANY);
nRet = bind(nSocket, (struct sockaddr *)&ServerAddr, (socklen_t)sizeof(ServerAddr));
if (-1 == nRet)
{
printf("bind error:%s\n", strerror(errno));
return -1;
}
//int listen(int sockfd, int backlog);
nRet = listen(nSocket, 100);
if (-1 == nRet)
{
printf("listen error:%s\n", strerror(errno));
return -1;
}
printf("server begin listen ....\n");
//开始创建libvent
//--主线程只管监听socket,连接socket由工作线程来管理------------------------------------
//当有新的连接到来时,主线程就接受之并将新返回的连接socket派发给某个工作线程
//此后该新socket上的任何I/O操作都有被选中的工作线程来处理
//工作线程检测到管道上有数据可读
//--------------------------------------------------------------------------------------
int ret;
int i;
int fd[2];
pthread_t tid;
dispatcher_thread.base = event_init();
if (dispatcher_thread.base == NULL)
{
perror("event_init( base )");
return 1;
}
dispatcher_thread.tid = pthread_self();
threads = (LIBEVENT_THREAD *)calloc(thread_num, sizeof(LIBEVENT_THREAD));
if (threads == NULL)
{
perror("calloc");
return 1;
}
for (i = 0; i < thread_num; i++)
{
ret = socketpair(AF_LOCAL, SOCK_STREAM, 0, fd);
if (ret == -1)
{
perror("socketpair()");
return 1;
}
threads[i].read_fd = fd[1];
threads[i].write_fd = fd[0];
threads[i].base = event_init();
if (threads[i].base == NULL)
{
perror("event_init()");
return 1;
}
//工作线程处理可读处理
event_set(&threads[i].event, threads[i].read_fd, EV_READ | EV_PERSIST, thread_libevent_process, &threads[i]);
event_base_set(threads[i].base, &threads[i].event);
if (event_add(&threads[i].event, 0) == -1)
{
perror("event_add()");
return 1;
}
}
for (i = 0; i < thread_num; i++)
{
pthread_create(&tid, NULL, worker_thread, &threads[i]);
}
//2,创建具体的事件,
struct event ListenEvent;
//3, 把事件,套接字,libevent的管理器给管理起来, 也叫注册
//int event_assign(struct event *, struct event_base *, evutil_socket_t, short, event_callback_fn, void *);
if (-1 == event_assign(&ListenEvent, dispatcher_thread.base, nSocket, EV_READ | EV_PERSIST, ListenAccept, NULL))
{
printf("event_assign error:%s\n", strerror(errno));
return -1;
}
// 4, 让我们注册的事件 可以被调度
if (-1 == event_add(&ListenEvent, NULL))
{
printf("event_add error:%s\n", strerror(errno));
return -1;
}
printf("libvent start run ...\n");
// 5,运行libevent
if (-1 == event_base_dispatch(dispatcher_thread.base))
{
printf("event_base_dispatch error:%s\n", strerror(errno));
return -1;
}
printf("---------------------------\n");
//--------------------------------------------------------------------------------------
return 0;
}