Libevent的事件驱动源码分析(二)

多线程支持

Posted by chensong on 2019-07-04 23::22::41

前言

libevent线程是不安全的, 现在服务器都是多核的如何充分使用cpu就要使用多线程。

正文

一, 谈谈libevent如何使用多线程呢

我们在上学的时候, 经常去学校的食堂吃饭。 有几种情况

1, 一个窗口排队等待打饭

一个窗口就相当于我们网络中io单线程的处理, 不会出现惊群效应(linux 3.0+ epoll这个问题已经解决了) ,这个效率会比较底下, 学生排队的时间过长

2, 多个窗口排队等待打饭

多个窗口排队等待打饭, 一个窗口就相当于我们一个线程, 处理的速度就会比较快的, 效率较高。

二, 谈谈libevent如何多线程中通信的

libevent使用两种方式通信

  1. 管道 (pipe)
  2. socket

1,我们一起看看怎么使用socket 通信的

evutil_socketpair 创建socket 事件通知int (*notify)(struct event_base *) = evthread_notify_base_default; // io 线程的操作回调函数 base->th_notify_fn = notify;

evthread_notify_base_default这个函数就通知事件写入1个字节数据

接收线程 void (*cb)(evutil_socket_t, short, void *) = evthread_notify_drain_default; // main thread 回调函数

放到事件队列中去了

evthread_notify_drain_default函数就修改变量is_notify_pending

2, 唤醒线程的流程

① event_add_internal
if (base->current_event == ev && (ev->ev_events & EV_SIGNAL)
	&& !EVBASE_IN_THREAD(base)) 
{
	++base->current_event_waiters;
	// 等待 通知工作  
	EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock);
}
② event_del_internal
if (base->current_event == ev && !EVBASE_IN_THREAD(base)) 
{
	++base->current_event_waiters; // wait
	// 等待 通知工作  
	EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock);
}
③ event_active_nolock
if (base->current_event == ev && !EVBASE_IN_THREAD(base)) 
{
	++base->current_event_waiters;
	// 等待 通知工作  
	EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock);
}

2. 广播的函数

① event_process_active_single_queue
for (ev = TAILQ_FIRST(activeq); ev; ev = TAILQ_FIRST(activeq)) 
{
	if (ev->ev_events & EV_PERSIST)
	{
		event_queue_remove(base, ev, EVLIST_ACTIVE);
	}
	else
	{
		event_del_internal(ev);
	}
	if (!(ev->ev_flags & EVLIST_INTERNAL))
	{
		++count;
	}

	event_debug((
		 "event_process_active: event: %p, %s%scall %p",
		ev,
		ev->ev_res & EV_READ ? "EV_READ " : " ",
		ev->ev_res & EV_WRITE ? "EV_WRITE " : " ",
		ev->ev_callback));

#ifndef _EVENT_DISABLE_THREAD_SUPPORT
	base->current_event = ev;
	base->current_event_waiters = 0;
#endif

	switch (ev->ev_closure) 
	{
	case EV_CLOSURE_SIGNAL:
		event_signal_closure(base, ev);
		break;
	case EV_CLOSURE_PERSIST:
		event_persist_closure(base, ev);
		break;
	default:
	case EV_CLOSURE_NONE:
		EVBASE_RELEASE_LOCK(base, th_base_lock);
		(*ev->ev_callback)(
			(int)ev->ev_fd, ev->ev_res, ev->ev_arg);  //  调用业务的回调函数 
		break;
	}

	EVBASE_ACQUIRE_LOCK(base, th_base_lock);
#ifndef _EVENT_DISABLE_THREAD_SUPPORT
	base->current_event = NULL;
	if (base->current_event_waiters)
	{
		base->current_event_waiters = 0;
		// 通知 广播 main all thread wait事件 起来工作了 ^_^
		EVTHREAD_COND_BROADCAST(base->current_event_cond);
	}
#endif

	if (base->event_break)
	{
		return -1;
	}
}

三 , 多线程

上面的等待是否是多线程每个线程独有event_base的都启动一个线程

#include <string.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <string.h>
#include <errno.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <netinet/in.h>
#include <netinet/ip.h> 
#include <errno.h> 
#include <event2/event.h>
#include <stdlib.h>
#include <pthread.h>
#include <fcntl.h>

#define ERR_EXIT(m) \
        do\
        { \
                perror(m); \
                exit(EXIT_FAILURE); \
        } while(0)\


void send_fd(int sock_fd, int send_fd)
{
	int ret;
	struct msghdr msg;
	struct cmsghdr *p_cmsg;
	struct iovec vec;
	char cmsgbuf[CMSG_SPACE(sizeof(send_fd))];
	int *p_fds;
	char sendchar = 0;
	msg.msg_control = cmsgbuf;
	msg.msg_controllen = sizeof(cmsgbuf);
	p_cmsg = CMSG_FIRSTHDR(&msg);
	p_cmsg->cmsg_level = SOL_SOCKET;
	p_cmsg->cmsg_type = SCM_RIGHTS;
	p_cmsg->cmsg_len = CMSG_LEN(sizeof(send_fd));
	p_fds = (int *)CMSG_DATA(p_cmsg);
	*p_fds = send_fd; // 通过传递辅助数据的方式传递文件描述符

	msg.msg_name = NULL;
	msg.msg_namelen = 0;
	msg.msg_iov = &vec;
	msg.msg_iovlen = 1; //主要目的不是传递数据,故只传1个字符
	msg.msg_flags = 0;

	vec.iov_base = &sendchar;
	vec.iov_len = sizeof(sendchar);
	ret = sendmsg(sock_fd, &msg, 0);
	if (ret != 1)
	{
		ERR_EXIT("sendmsg");
	}
}

int recv_fd(const int sock_fd)
{
	int ret;
	struct msghdr msg;
	char recvchar;
	struct iovec vec;
	int recv_fd;
	char cmsgbuf[CMSG_SPACE(sizeof(recv_fd))];
	struct cmsghdr *p_cmsg;
	int *p_fd;
	vec.iov_base = &recvchar;
	vec.iov_len = sizeof(recvchar);
	msg.msg_name = NULL;
	msg.msg_namelen = 0;
	msg.msg_iov = &vec;
	msg.msg_iovlen = 1;
	msg.msg_control = cmsgbuf;
	msg.msg_controllen = sizeof(cmsgbuf);
	msg.msg_flags = 0;

	p_fd = (int *)CMSG_DATA(CMSG_FIRSTHDR(&msg));
	*p_fd = -1;
	ret = recvmsg(sock_fd, &msg, 0);
	if (ret != 1)
	{
		ERR_EXIT("recvmsg");
	}

	p_cmsg = CMSG_FIRSTHDR(&msg);
	if (p_cmsg == NULL)
	{
		ERR_EXIT("no passed fd");
	}


	p_fd = (int *)CMSG_DATA(p_cmsg);
	recv_fd = *p_fd;
	if (recv_fd == -1)
	{
		ERR_EXIT("no passed fd");
	}

	return recv_fd;
}

//-------------------------------------------------
typedef struct {
	pthread_t tid;
	struct event_base *base;
	struct event event;
	int read_fd;
	int write_fd;
}LIBEVENT_THREAD;

typedef struct {
	pthread_t tid;
	struct event_base *base;
}DISPATCHER_THREAD;


const int thread_num = 10;

LIBEVENT_THREAD *threads;
DISPATCHER_THREAD dispatcher_thread;
int last_thread = 0;
//-------------------------------------------------

unsigned short  nPort = 9000;
struct event_base *pEventMgr = NULL;

void Reader(int sock, short event, void* arg)
{
	fprintf(stderr, "reader ---------------%d\n", sock);
	char  buffer[1024];
	memset(buffer, 0, sizeof(buffer));

	int ret = -1;
	fprintf(stderr, "1 ---------------%d\n", sock);
	ret = recv(sock, buffer, sizeof(buffer), 0);
	fprintf(stderr, "2 ---------------%d\n", sock);
	if (-1 == ret || 0 == ret)
	{
		printf("recv error:%s\n", strerror(errno));
		return;
	}

	printf("recv data:%s\n", buffer);


	//strcat(buffer,", hello,client\n");
	//send(sock,buffer,strlen(buffer),0);


	return;
}

static void thread_libevent_process(int fd, short which, void *arg)
{
	int ret;
	char buf[128];
	LIBEVENT_THREAD* me = (LIBEVENT_THREAD*)arg;

	int socket_fd = recv_fd(me->read_fd);

	struct event *pReadEvent = NULL;
	pReadEvent = (struct event *)malloc(sizeof(struct event));

	event_assign(pReadEvent, me->base, socket_fd, EV_READ | EV_PERSIST, Reader, NULL);

	event_add(pReadEvent, NULL);

	return;
}

static void * worker_thread(void *arg)
{

	LIBEVENT_THREAD* me = (LIBEVENT_THREAD*)arg;
	me->tid = pthread_self();

	event_base_loop(me->base, 0);


	return NULL;
}


void ListenAccept(int sock, short event, void* arg)
{
	printf("ListenAccept ................\n");
	// 1,读 --也就是accept
	struct sockaddr_in ClientAddr;
	int nClientSocket = -1;
	socklen_t ClientLen = sizeof(ClientAddr);
	printf("---------------------------1\n");
	nClientSocket = accept(sock, (struct sockaddr *)&ClientAddr, &ClientLen);
	printf("---------------------------2,%d\n", nClientSocket);
	if (-1 == nClientSocket)
	{
		printf("accet error:%s\n", strerror(errno));
		return;
	}
	fprintf(stderr, "a client connect to server ....\n");

	//进行数据分发
	int tid = (last_thread + 1) % thread_num;        //memcached中线程负载均衡算法
	LIBEVENT_THREAD *thread = threads + tid;
	last_thread = tid;
	send_fd(thread->write_fd, nClientSocket);

	return;
}


int main(int argc, char *argv[])
{

	int nSocket = -1;
	int nRet = -1;

	nSocket = socket(PF_INET, SOCK_STREAM, 0);
	if (-1 == nSocket) // 
	{
		printf("socket error:%s\n", strerror(errno));
		return -1;
	}

	int value = 1;
	setsockopt(nSocket, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(value));


	struct sockaddr_in ServerAddr;
	ServerAddr.sin_family = PF_INET;
	ServerAddr.sin_port = htons(nPort);
	ServerAddr.sin_addr.s_addr = htonl(INADDR_ANY);

	nRet = bind(nSocket, (struct sockaddr *)&ServerAddr, (socklen_t)sizeof(ServerAddr));
	if (-1 == nRet)
	{
		printf("bind error:%s\n", strerror(errno));
		return -1;
	}


	//int listen(int sockfd, int backlog);
	nRet = listen(nSocket, 100);
	if (-1 == nRet)
	{
		printf("listen error:%s\n", strerror(errno));
		return -1;
	}

	printf("server begin listen ....\n");


	//开始创建libvent
	//--主线程只管监听socket,连接socket由工作线程来管理------------------------------------
	//当有新的连接到来时,主线程就接受之并将新返回的连接socket派发给某个工作线程
	//此后该新socket上的任何I/O操作都有被选中的工作线程来处理
	//工作线程检测到管道上有数据可读
	//--------------------------------------------------------------------------------------
	int ret;
	int i;
	int fd[2];

	pthread_t tid;

	dispatcher_thread.base = event_init();
	if (dispatcher_thread.base == NULL) 
	{
		perror("event_init( base )");
		return 1;
	}
	dispatcher_thread.tid = pthread_self();

	threads = (LIBEVENT_THREAD *)calloc(thread_num, sizeof(LIBEVENT_THREAD));
	if (threads == NULL) 
	{
		perror("calloc");
		return 1;
	}

	for (i = 0; i < thread_num; i++)
	{

		ret = socketpair(AF_LOCAL, SOCK_STREAM, 0, fd);
		if (ret == -1) 
		{
			perror("socketpair()");
			return 1;
		}

		threads[i].read_fd = fd[1];
		threads[i].write_fd = fd[0];

		threads[i].base = event_init();
		if (threads[i].base == NULL) 
		{
			perror("event_init()");
			return 1;
		}

		//工作线程处理可读处理
		event_set(&threads[i].event, threads[i].read_fd, EV_READ | EV_PERSIST, thread_libevent_process, &threads[i]);
		event_base_set(threads[i].base, &threads[i].event);
		if (event_add(&threads[i].event, 0) == -1) 
		{
			perror("event_add()");
			return 1;
		}
	}

	for (i = 0; i < thread_num; i++)
	{
		pthread_create(&tid, NULL, worker_thread, &threads[i]);
	}


	//2,创建具体的事件,

	struct event ListenEvent;

	//3, 把事件,套接字,libevent的管理器给管理起来, 也叫注册
	//int event_assign(struct event *, struct event_base *, evutil_socket_t, short, event_callback_fn, void *);
	if (-1 == event_assign(&ListenEvent, dispatcher_thread.base, nSocket, EV_READ | EV_PERSIST, ListenAccept, NULL))
	{
		printf("event_assign error:%s\n", strerror(errno));
		return -1;
	}

	// 4, 让我们注册的事件 可以被调度
	if (-1 == event_add(&ListenEvent, NULL))
	{
		printf("event_add error:%s\n", strerror(errno));
		return -1;
	}

	printf("libvent start run ...\n");
	// 5,运行libevent
	if (-1 == event_base_dispatch(dispatcher_thread.base))
	{
		printf("event_base_dispatch error:%s\n", strerror(errno));
		return -1;
	}

	printf("---------------------------\n");


	//--------------------------------------------------------------------------------------


	return 0;
}

结语