Libevent的事件驱动源码分析(三)

反应堆的设计

Posted by chensong on 2019-07-06 09::41::26

前言

网络编程使用基本都是反应堆(reactor) 著名C++标准的Boost中asio使用reactor模式, 它的io线程是安全的, libevent使用io也使用该模式

正文

一, reactor模式

1, 采用基于事件驱动的设计,当有事件触发时,才会调用处理器进行数据处理。

在这里插入图片描述

Reactor:负责响应IO事件,当检测到一个新的事件,将其发送给相应的Handler去处理。

Handler:负责处理非阻塞的行为,标识系统管理的资源;同时将handler与事件绑定。

Reactor为单个线程,需要处理accept连接,同时发送请求到处理器中。

由于只有单个线程,所以处理器中的业务需要能够快速处理完。

2, 使用多线程处理业务逻辑。

在这里插入图片描述

将处理器的执行放入线程池,多线程进行业务处理。但Reactor仍为单个线程。

3,对于多个CPU的机器,为充分利用系统资源,将Reactor拆分为两部分。

在这里插入图片描述

mainReactor负责监听连接,accept连接给subReactor处理,为什么要单独分一个Reactor来处理监听呢?因为像TCP这样需要经过3次握手才能建立连接,这个建立连接的过程也是要耗时间和资源的,单独分一个Reactor来处理,可以提高性能。

二, 在Reactor模式中,有5个关键的参与者。

1, 描述符(handle):

由操作系统提供,用于识别每一个事件,如Socket描述符、文 件描述符等。在Linux中,它用一个整数来表示。事件可以来自外部,如来自客户端 的连接请求、数据等。事件也可以来自内部,如定时器事件。

2, 同步事件分离器 (demultiplexer):epoll_wait

是一个函数,用来等待一个或多个事件的发生。调用者会被阻 塞,直到分离器分离的描述符集上有事件发生。Linux的select函数是一个经常被使 用的分离器。

3, 事件处理器接口(event handler):callback

是由一个或多个模板函数组成的接口。这些模板函数描述了和应用程序相关的对某个事件的操作。具体的事件处理器:是事件处理器接口的实现。它实现了应用程序提供的某个服务。每个具体的事件处理器总和一个描述符相关。它使用描述符来识别事件、识别应用程序提供的服务。

4, Reactor管理器(reactor):epoll_ctl

定义了一些接口,用于应用程序控制事件调度,以及应用程序注册、删除事件处理器和相关的描述符。它是事件处理器的调度核心。Reactor管理器使用同步事件分离器来等待事件的发生。一旦事件发生,Reactor管理器先是分离每个事件,然后调度事件处理器,最后调用相关的模板函 数来处理这个事件。通过上述分析,我们注意到,是Reactor管理器而不是应用程序负责等待事件、分离事件和调度事件。实际上,Reactor管理器并没有被具体的事件处理器调用,而是管理器调度具体的事件处理器,由事件处理器对发生的事件做出处理。这就是类似Hollywood原则的“反向控制”。应用程序要做的仅仅是实现一个具体的事件处理器,然后把它注册到Reactor管理器中。接下来的工作由管理 器来完成

三, libevent 的反应堆分析

流程图

在这里插入图片描述

1, 事件的回调函数和描述符(Reactor管理器)

事件的结构体

struct event {
	TAILQ_ENTRY(event) ev_active_next;
	TAILQ_ENTRY(event) ev_next;
	/* for managing timeouts */
	union {
		TAILQ_ENTRY(event) ev_next_with_common_timeout;
		int min_heap_idx;
	} ev_timeout_pos;
	evutil_socket_t ev_fd;

	struct event_base *ev_base;
    // 共用体
	union {
		/* used for io events */
		struct {
			TAILQ_ENTRY(event) ev_io_next;
			struct timeval ev_timeout;
		} ev_io;

		/* used by signal events */
		struct {
			TAILQ_ENTRY(event) ev_signal_next;
			short ev_ncalls;
			/* Allows deletes in callback */
			short *ev_pncalls;
		} ev_signal;
	} _ev;

	short ev_events;
	short ev_res;		/* result passed to event callback */
	short ev_flags;
	ev_uint8_t ev_pri;	/* smaller numbers are higher priority */
	ev_uint8_t ev_closure;
	struct timeval ev_timeout;

	/* allows us to adopt for different types of events */
	void (*ev_callback)(evutil_socket_t, short, void *arg);
	void *ev_arg;
};

把事件的文件描述符和回调函数注册到epoll中的

int evmap_io_add(struct event_base *base, evutil_socket_t fd, struct event *ev)
{
	const struct eventop *evsel = base->evsel;
	struct event_io_map *io = &base->io;
	struct evmap_io *ctx = NULL;
	int nread, nwrite, retval = 0;
	short res = 0, old = 0;
	struct event *old_ev;

	EVUTIL_ASSERT(fd == ev->ev_fd);

	if (fd < 0)
	{
		return 0;
	}

#ifndef EVMAP_USE_HT
	//判断是否需要扩容操作
	if (fd >= io->nentries) 
	{
		if (evmap_make_space(io, fd, sizeof(struct evmap_io *)) == -1)
		{
			return (-1);
		}
	}
#endif
	//把文件描述符放到槽中或者查找
	GET_IO_SLOT_AND_CTOR(ctx, io, fd, evmap_io, evmap_io_init,
						 evsel->fdinfo_len);

	nread = ctx->nread;
	nwrite = ctx->nwrite;

	if (nread)
	{
		old |= EV_READ;
	}
	if (nwrite)
	{
		old |= EV_WRITE;
	}

	if (ev->ev_events & EV_READ)
	{
		if (++nread == 1)
		{
			res |= EV_READ;
		}
	}
	if (ev->ev_events & EV_WRITE) 
	{
		if (++nwrite == 1)
		{
			res |= EV_WRITE;
		}
	}
	if (EVUTIL_UNLIKELY(nread > 0xffff || nwrite > 0xffff)) 
	{
		event_warnx("Too many events reading or writing on fd %d",
		    (int)fd);
		return -1;
	}
	if (EVENT_DEBUG_MODE_IS_ON() &&
	    (old_ev = TAILQ_FIRST(&ctx->events)) &&
	    (old_ev->ev_events&EV_ET) != (ev->ev_events&EV_ET)) 
	{
		event_warnx("Tried to mix edge-triggered and non-edge-triggered"
		    " events on fd %d", (int)fd);
		return -1;
	}

	if (res)
	{
		void *extra = ((char*)ctx) + sizeof(struct evmap_io);
		/* XXX(niels): we cannot mix edge-triggered and
		 * level-triggered, we should probably assert on
		 * this. */ // 添加到epoll中
		if (evsel->add(base, ev->ev_fd,
			old, (ev->ev_events & EV_ET) | res, extra) == -1)
		{
			return (-1);
		}
		retval = 1;
	}

	ctx->nread = (ev_uint16_t) nread;
	ctx->nwrite = (ev_uint16_t) nwrite;
	TAILQ_INSERT_TAIL(&ctx->events, ev, ev_io_next);

	return (retval);
}





// epoll add
static int epoll_nochangelist_add(struct event_base *base, evutil_socket_t fd,
    short old, short events, void *p)
{
	struct event_change ch;
	ch.fd = fd;
	ch.old_events = old;
	ch.read_change = ch.write_change = 0;
	if (events & EV_WRITE)
	{
		ch.write_change = EV_CHANGE_ADD |
			(events & EV_ET);
	}
	if (events & EV_READ)
	{
		ch.read_change = EV_CHANGE_ADD |
			(events & EV_ET);
	}

	return epoll_apply_one_change(base, base->evbase, &ch);
}

static int epoll_apply_one_change(struct event_base *base,
    struct epollop *epollop,
    const struct event_change *ch)
{
	struct epoll_event epev;
	int op, events = 0;

	if (1) 
	{
		/* The logic here is a little tricky.  If we had no events set
		   on the fd before, we need to set op="ADD" and set
		   events=the events we want to add.  If we had any events set
		   on the fd before, and we want any events to remain on the
		   fd, we need to say op="MOD" and set events=the events we
		   want to remain.  But if we want to delete the last event,
		   we say op="DEL" and set events=the remaining events.  What
		   fun!
		*/

		/* TODO: Turn this into a switch or a table lookup. */

		if ((ch->read_change & EV_CHANGE_ADD) ||
		    (ch->write_change & EV_CHANGE_ADD)) 
		{
			/* If we are adding anything at all, we'll want to do
			 * either an ADD or a MOD. */
			events = 0;
			op = EPOLL_CTL_ADD;
			//1. read file descriptor
			if (ch->read_change & EV_CHANGE_ADD) 
			{
				events |= EPOLLIN;
			}
			else if (ch->read_change & EV_CHANGE_DEL) 
			{
				;
			}
			else if (ch->old_events & EV_READ) 
			{
				events |= EPOLLIN;
			}
			
			// 2. write file descriptor
			if (ch->write_change & EV_CHANGE_ADD) // write
			{
				events |= EPOLLOUT;
			}
			else if (ch->write_change & EV_CHANGE_DEL) 
			{
				;
			}
			else if (ch->old_events & EV_WRITE) 
			{
				events |= EPOLLOUT;
			}

			// 3. read and write file descriptor
			if ((ch->read_change | ch->write_change) & EV_ET)
			{
				events |= EPOLLET;
			}

			if (ch->old_events)
			{
				/* If MOD fails, we retry as an ADD, and if
				 * ADD fails we will retry as a MOD.  So the
				 * only hard part here is to guess which one
				 * will work.  As a heuristic, we'll try
				 * MOD first if we think there were old
				 * events and ADD if we think there were none.
				 *
				 * We can be wrong about the MOD if the file
				 * has in fact been closed and re-opened.
				 *
				 * We can be wrong about the ADD if the
				 * the fd has been re-created with a dup()
				 * of the same file that it was before.
				 */
				op = EPOLL_CTL_MOD;
			}
		}
		else if ((ch->read_change & EV_CHANGE_DEL) ||
		    (ch->write_change & EV_CHANGE_DEL)) 
		{
			/* If we're deleting anything, we'll want to do a MOD
			 * or a DEL. */
			op = EPOLL_CTL_DEL;
			// 1. read  file descriptor
			if (ch->read_change & EV_CHANGE_DEL)  
			{
				// 
				if (ch->write_change & EV_CHANGE_DEL) //  read and write  del  
				{
					events = EPOLLIN|EPOLLOUT;
				}
				else if (ch->old_events & EV_WRITE) // mod read   
				{
					events = EPOLLOUT;
					op = EPOLL_CTL_MOD;
				}
				else 
				{
					events = EPOLLIN;
				}
			}
			else if (ch->write_change & EV_CHANGE_DEL) // write file descriptor
			{
				if (ch->old_events & EV_READ) // mod read 
				{
					events = EPOLLIN;
					op = EPOLL_CTL_MOD;
				}
				else 
				{
					events = EPOLLOUT;  // del write
				}
			}
		}

		if (!events)
		{
			return 0;
		}

		memset(&epev, 0, sizeof(epev));
		epev.data.fd = ch->fd;
		epev.events = events;
		if (epoll_ctl(epollop->epfd, op, ch->fd, &epev) == -1) 
		{
			if (op == EPOLL_CTL_MOD && errno == ENOENT) 
			{
				/* If a MOD operation fails with ENOENT, the
				 * fd was probably closed and re-opened.  We
				 * should retry the operation as an ADD.
				 */
				if (epoll_ctl(epollop->epfd, EPOLL_CTL_ADD, ch->fd, &epev) == -1) 
				{
					event_warn("Epoll MOD(%d) on %d retried as ADD; that failed too",
					    (int)epev.events, ch->fd);
					return -1;
				}
				else 
				{
					event_debug(("Epoll MOD(%d) on %d retried as ADD; succeeded.",
						(int)epev.events,
						ch->fd));
				}
			}
			else if (op == EPOLL_CTL_ADD && errno == EEXIST) 
			{
				/* If an ADD operation fails with EEXIST,
				 * either the operation was redundant (as with a
				 * precautionary add), or we ran into a fun
				 * kernel bug where using dup*() to duplicate the
				 * same file into the same fd gives you the same epitem
				 * rather than a fresh one.  For the second case,
				 * we must retry with MOD. */
				if (epoll_ctl(epollop->epfd, EPOLL_CTL_MOD, ch->fd, &epev) == -1) 
				{
					event_warn("Epoll ADD(%d) on %d retried as MOD; that failed too",
					    (int)epev.events, ch->fd);
					return -1;
				}
				else 
				{
					event_debug(("Epoll ADD(%d) on %d retried as MOD; succeeded.",
						(int)epev.events,
						ch->fd));
				}
			}
			else if (op == EPOLL_CTL_DEL &&
			    (errno == ENOENT || errno == EBADF ||
				errno == EPERM)) 
			{
				/* If a delete fails with one of these errors,
				 * that's fine too: we closed the fd before we
				 * got around to calling epoll_dispatch. */
				event_debug(("Epoll DEL(%d) on fd %d gave %s: DEL was unnecessary.",
					(int)epev.events,
					ch->fd,
					strerror(errno)));
			}
			else 
			{
				event_warn("Epoll %s(%d) on fd %d failed.  Old events were %d; read change was %d (%s); write change was %d (%s)",
				    epoll_op_to_string(op),
				    (int)epev.events,
				    ch->fd,
				    ch->old_events,
				    ch->read_change,
				    change_to_string(ch->read_change),
				    ch->write_change,
				    change_to_string(ch->write_change));
				return -1;
			}
		}
		else 
		{
			event_debug(("Epoll %s(%d) on fd %d okay. [old events were %d; read change was %d; write change was %d]",
				epoll_op_to_string(op),
				(int)epev.events,
				(int)ch->fd,
				ch->old_events,
				ch->read_change,
				ch->write_change));
		}
	}
	return 0;
}

以上的流程都是将read和write注释到epoll中 相当于Reactor的模式的管理器

2, 事件同步事件分离器 (demultiplexer)


static int epoll_dispatch(struct event_base *base, struct timeval *tv)
{
	struct epollop *epollop = base->evbase;
	struct epoll_event *events = epollop->events;
	int i, res;
	long timeout = -1;

	if (tv != NULL) 
	{
		timeout = evutil_tv_to_msec(tv);
		if (timeout < 0 || timeout > MAX_EPOLL_TIMEOUT_MSEC) 
		{
			/* Linux kernels can wait forever if the timeout is
			 * too big; see comment on MAX_EPOLL_TIMEOUT_MSEC. */
			timeout = MAX_EPOLL_TIMEOUT_MSEC;
		}
	}

	epoll_apply_changes(base);
	event_changelist_remove_all(&base->changelist, base);

	EVBASE_RELEASE_LOCK(base, th_base_lock);

	res = epoll_wait(epollop->epfd, events, epollop->nevents, timeout);

	EVBASE_ACQUIRE_LOCK(base, th_base_lock);

	if (res == -1) 
	{
		if (errno != EINTR) 
		{
			event_warn("epoll_wait");
			return (-1);
		}

		return (0);
	}

	event_debug(("%s: epoll_wait reports %d", __func__, res));
	EVUTIL_ASSERT(res <= epollop->nevents);

	for (i = 0; i < res; i++) 
	{
		int what = events[i].events;
		short ev = 0;

		if (what & (EPOLLHUP|EPOLLERR)) 
		{
			ev = EV_READ | EV_WRITE;
		}
		else 
		{
			if (what & EPOLLIN)
			{
				ev |= EV_READ;
			}
			if (what & EPOLLOUT)
			{
				ev |= EV_WRITE;
			}
		}

		if (!ev)
		{
			continue;
		}
		// event 通知事件
		evmap_io_active(base, events[i].data.fd, ev | EV_ET);
	}
	// 扩容2倍增加反应堆的数量
	if (res == epollop->nevents && epollop->nevents < MAX_NEVENT) 
	{
		/* We used all of the event space this time.  We should
		   be ready for more events next time. */
		int new_nevents = epollop->nevents * 2;
		struct epoll_event *new_events;

		new_events = mm_realloc(epollop->events,
		    new_nevents * sizeof(struct epoll_event));
		if (new_events)
		{
			epollop->events = new_events;
			epollop->nevents = new_nevents;
		}
	}

	return (0);
}



void evmap_io_active(struct event_base *base, evutil_socket_t fd, short events)
{
	struct event_io_map *io = &base->io;
	struct evmap_io *ctx;
	struct event *ev;

#ifndef EVMAP_USE_HT
	EVUTIL_ASSERT(fd < io->nentries);
#endif
	GET_IO_SLOT(ctx, io, fd, evmap_io);//(x) = (struct type *)((map)->entries[slot])

	EVUTIL_ASSERT(ctx);
	TAILQ_FOREACH(ev, &ctx->events, ev_io_next) 
	{
		if (ev->ev_events & events)
		{
			event_active_nolock(ev, ev->ev_events & events, 1);
		}
	}
}

void event_active_nolock(struct event *ev, int res, short ncalls)
{
	struct event_base *base;

	event_debug(("event_active: %p (fd %d), res %d, callback %p",
		ev, (int)ev->ev_fd, (int)res, ev->ev_callback));


	/* We get different kinds of events, add them together */
	if (ev->ev_flags & EVLIST_ACTIVE) 
	{
		ev->ev_res |= res;
		return;
	}

	base = ev->ev_base;

	EVENT_BASE_ASSERT_LOCKED(base);

	ev->ev_res = res;

	if (ev->ev_events & EV_SIGNAL) 
	{
#ifndef _EVENT_DISABLE_THREAD_SUPPORT
		if (base->current_event == ev && !EVBASE_IN_THREAD(base)) 
		{
			++base->current_event_waiters;
			// 等待 通知工作  
			EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock);
		}
#endif
		ev->ev_ncalls = ncalls;
		ev->ev_pncalls = NULL;
	}
	//插入事件队列中
	event_queue_insert(base, ev, EVLIST_ACTIVE);

	if (EVBASE_NEED_NOTIFY(base))
	{
		evthread_notify_base(base);
	}
}

static void event_queue_insert(struct event_base *base, struct event *ev, int queue)
{
	EVENT_BASE_ASSERT_LOCKED(base);

	if (ev->ev_flags & queue) 
	{
		/* Double insertion is possible for active events */
		if (queue & EVLIST_ACTIVE)
		{
			return;
		}

		event_errx(1, "%s: %p(fd %d) already on queue %x", __func__,
			   ev, ev->ev_fd, queue);
		return;
	}

	if (~ev->ev_flags & EVLIST_INTERNAL)
	{
		base->event_count++;
	}

	ev->ev_flags |= queue;
	switch (queue) 
	{
	case EVLIST_INSERTED:
		TAILQ_INSERT_TAIL(&base->eventqueue, ev, ev_next);
		break;
	case EVLIST_ACTIVE://反应堆
		base->event_count_active++;
		TAILQ_INSERT_TAIL(&base->activequeues[ev->ev_pri],
		    ev,ev_active_next);
		break;
	case EVLIST_TIMEOUT: 
	{
		if (is_common_timeout(&ev->ev_timeout, base)) 
		{
			struct common_timeout_list *ctl = get_common_timeout_list(base, &ev->ev_timeout);
			insert_common_timeout_inorder(ctl, ev);
		}
		else
		{
			min_heap_push(&base->timeheap, ev);
		}
		break;
	}
	default:
		event_errx(1, "%s: unknown queue %x", __func__, queue);
	}
}

以上是反应堆把事件保存到反应堆的队列中下面分析消费队列

event_base_loop

int event_base_loop(struct event_base *base, int flags)
{
	const struct eventop *evsel = base->evsel;
	struct timeval tv;
	struct timeval *tv_p;
	int res, done, retval = 0;

	/* Grab the lock.  We will release it inside evsel.dispatch, and again
	 * as we invoke user callbacks. */
	EVBASE_ACQUIRE_LOCK(base, th_base_lock);

	if (base->running_loop) 
	{
		event_warnx("%s: reentrant invocation.  Only one event_base_loop"
		    " can run on each event_base at once.", __func__);
		EVBASE_RELEASE_LOCK(base, th_base_lock);
		return -1;
	}

	base->running_loop = 1;

	clear_time_cache(base);

	if (base->sig.ev_signal_added && base->sig.ev_n_signals_added)
	{
		evsig_set_base(base);
	}

	done = 0;

#ifndef _EVENT_DISABLE_THREAD_SUPPORT
	base->th_owner_id = EVTHREAD_GET_ID();
#endif

	base->event_gotterm = base->event_break = 0;

	while (!done) 
	{
		/* Terminate the loop if we have been asked to */
		if (base->event_gotterm) 
		{
			break;
		}

		if (base->event_break) 
		{
			break;
		}

		timeout_correct(base, &tv);

		tv_p = &tv;
		if (!N_ACTIVE_CALLBACKS(base) && !(flags & EVLOOP_NONBLOCK)) 
		{
			timeout_next(base, &tv_p);
		}
		else 
		{
			/*
			 * if we have active events, we just poll new events
			 * without waiting.
			 */
			evutil_timerclear(&tv);
		}

		/* If we have no events, we just exit */
		if (!event_haveevents(base) && !N_ACTIVE_CALLBACKS(base)) 
		{
			event_debug(("%s: no events registered.", __func__));
			retval = 1;
			goto done;
		}

		/* update last old time */
		gettime(base, &base->event_tv);

		clear_time_cache(base);
		// epoll_wait active
		res = evsel->dispatch(base, tv_p);

		if (res == -1) 
		{
			event_debug(("%s: dispatch returned unsuccessfully.",
				__func__));
			retval = -1;
			goto done;
		}

		update_time_cache(base);
		// 处理 超时的文件描述符 删除io的事件 提交到事件中 给业务队列中
		timeout_process(base);

		if (N_ACTIVE_CALLBACKS(base))
		{
			// 处理队列中的事件
			int n = event_process_active(base);
			if ((flags & EVLOOP_ONCE)
				&& N_ACTIVE_CALLBACKS(base) == 0
				&& n != 0)
			{
				done = 1;
			}
		}
		else if (flags & EVLOOP_NONBLOCK)
		{
			done = 1;
		}
	}
	event_debug(("%s: asked to terminate loop.", __func__));

done:
	clear_time_cache(base);
	base->running_loop = 0;

	EVBASE_RELEASE_LOCK(base, th_base_lock);

	return (retval);
}


static int event_process_active(struct event_base *base)
{
	/* Caller must hold th_base_lock */
	struct event_list *activeq = NULL;
	int i, c = 0;

	for (i = 0; i < base->nactivequeues; ++i) 
	{
		if (TAILQ_FIRST(&base->activequeues[i]) != NULL) 
		{
			activeq = &base->activequeues[i];
			c = event_process_active_single_queue(base, activeq);
			if (c < 0)
			{
				return -1;
			}
			else if (c > 0)
			{
				break; /* Processed a real event; do not
					   * consider lower-priority events */
					   /* If we get here, all of the events we processed
					   * were internal.  Continue. */
			}
		}
	}

	event_process_deferred_callbacks(&base->defer_queue,&base->event_break);
	return c;
}


static int event_process_active_single_queue(struct event_base *base,
    struct event_list *activeq)
{
	struct event *ev;
	int count = 0;

	EVUTIL_ASSERT(activeq != NULL);

	for (ev = TAILQ_FIRST(activeq); ev; ev = TAILQ_FIRST(activeq)) 
	{
		if (ev->ev_events & EV_PERSIST)
		{
			event_queue_remove(base, ev, EVLIST_ACTIVE);
		}
		else
		{
			event_del_internal(ev);
		}
		if (!(ev->ev_flags & EVLIST_INTERNAL))
		{
			++count;
		}

		event_debug((
			 "event_process_active: event: %p, %s%scall %p",
			ev,
			ev->ev_res & EV_READ ? "EV_READ " : " ",
			ev->ev_res & EV_WRITE ? "EV_WRITE " : " ",
			ev->ev_callback));

#ifndef _EVENT_DISABLE_THREAD_SUPPORT
		base->current_event = ev;
		base->current_event_waiters = 0;
#endif

		switch (ev->ev_closure) 
		{
		case EV_CLOSURE_SIGNAL:
			event_signal_closure(base, ev);
			break;
		case EV_CLOSURE_PERSIST:
			event_persist_closure(base, ev);
			break;
		default:
		case EV_CLOSURE_NONE: // 回调函数处理
			EVBASE_RELEASE_LOCK(base, th_base_lock);
			(*ev->ev_callback)(
				(int)ev->ev_fd, ev->ev_res, ev->ev_arg);  //  调用业务的回调函数 
			break;
		}

		EVBASE_ACQUIRE_LOCK(base, th_base_lock);
#ifndef _EVENT_DISABLE_THREAD_SUPPORT
		base->current_event = NULL;
		if (base->current_event_waiters)
		{
			base->current_event_waiters = 0;
			// 通知 广播 main all thread wait事件 起来工作了 ^_^
			EVTHREAD_COND_BROADCAST(base->current_event_cond);
		}
#endif

		if (base->event_break)
		{
			return -1;
		}
	}
	return count;
}

结语