linux多线程同步

boost库的多线程分析使用

Posted by chensong on 2017-10-26 00::32::39

前言

基本功的修炼 是看第三方库基础

正文

一,互斥锁和添加变量的使用

  1. pthread_mutex_lock()
  2. phread_mutex_unlock()
  3. pthread_mutex_init();
  4. pthread_mutex_destroy()

案例一:

#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
// 产品队列
int ready=0;

//互斥锁
pthread_mutex_t  mutex = PTHREAD_MUTEX_INITIALIZER;

//条件变量
pthread_cond_t has_product = PTHREAD_COND_INITIALIZER;

//生产者

void * produce(void *arg){

	char* name=(char*)arg;
	for(;;){
	//锁住
	  pthread_mutex_lock(&mutex);
	  ready++;
	  printf("生产者  生产 %d \n",ready);
	  //发送一条解锁消息
	 pthread_cond_signal(&has_product);
	//解锁
	pthread_mutex_unlock(&mutex);
	  sleep(3);
	}
}

//消费者
void* consumer(void* arg){
	char* name=(char*)arg;
	for(;;){
	 pthread_mutex_lock(&mutex);
	//如果产品列表为空   继续等待不可能只有1个消费者
	   while(ready==0){
		printf("没有产品了");
		pthread_cond_wait(&has_product,&mutex);
	   }
		 //加锁
      	   ready--;
	   printf("消费者 %s  消费%d\n",name,ready);
	    pthread_mutex_unlock(&mutex);
	   sleep(1);
	}


}
void main(){
	pthread_mutex_init(&mutex,NULL);

	pthread_cond_init(&has_product,NULL);
	  //线程的引用
        pthread_t tid1,tid2,tid3;
        pthread_create(&tid1,NULL,produce,"生产者");
	pthread_create(&tid2,NULL,consumer,"消费者1");
	pthread_create(&tid3,NULL,consumer,"消费者2");

	printf("开启线程");
        void *rval;
        pthread_join(tid1,&rval);
	pthread_join(tid2,&rval);
	pthread_join(tid3,&rval);

        printf("线程结束%d\n",rval);

}


案例二:

//g++ -o cpthread cpthread.cpp -lpthread
/*************************************************************************
	> File Name: pthread_cond_mutex.c
	> Author: songli
	> QQ: 2734030745
	> Mail: 15850774503@163.com
    > CSDN: http://my.csdn.net/Poisx
	> github: https://github.com/chensongpoixs
	> Created Time: Wed 25 Oct 2017 11:18:19 PM CST
 ************************************************************************/

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/times.h>
#include <sys/wait.h>
#include <unistd.h>
#include <pthread.h>
#include <time.h>

#define MUTEXCOND 1

typedef struct node
{
    int data;
    struct node *next;
}NODE;

//栈区的全局变量
NODE *head = NULL;
#if MUTEXCOND
//互斥锁
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
//条件变量
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
#endif

//生产者
void *producer(void *args)
{
    NODE *pNode = NULL;
    while (1)
    {
#if MUTEXCOND
        //加锁
        pthread_mutex_lock(&mutex);
#endif 
        //开辟空间
        pNode = (NODE *)malloc(sizeof(NODE));
        pNode->data = rand() % 5000; //随机数
        printf("product :%d\n", pNode->data);
        //改变头结点
        pNode->next = head;
        head = pNode;
#if MUTEXCOND
        //解锁
        pthread_mutex_unlock(&mutex);
        //通知消费者进程
        pthread_cond_signal(&cond);
#endif 
        sleep(1);
    }
    //子线程退出通知主线程
    pthread_exit(NULL);
}

//消费者
void *consumer(void *args)
{
    int id = *(int *)args;
    NODE *pNode = NULL;
    for (;;)
    {
#if MUTEXCOND
        //加锁
        pthread_mutex_lock(&mutex);
        if (head == NULL)
        {
            pthread_cond_wait(&cond, &mutex);
        }
#endif 
        //接收数据
        pNode = head;
        head = pNode->next;
        printf("consumer id %d,  :%d\n", id, pNode->data);
        //释放内存
        free(pNode);
        pNode = NULL;
#if MUTEXCOND
        //解锁
        pthread_mutex_unlock(&mutex);
#endif 
        sleep(2);
    }
    //线程退出通知主线程
    pthread_exit(NULL);
}



int main(int argc, char *argv[])
{
    //子线程id
    pthread_t tpid1, tpid2, tpid3;
#if MUTEXCOND
    //初始化 互斥锁
    pthread_mutex_init(&mutex, NULL);
#endif
    int ret;
    int sum[2] = { 1, 2 };

    //创建子线程
    ret = pthread_create(&tpid1, NULL, producer, NULL);
    if (ret != 0)
    {
        printf("create pthread %s\n", strerror(ret));
        return -1;
    }
    ret = pthread_create(&tpid2, NULL, consumer, (void *)&sum[0]);
    if (ret != 0)
    {
        printf("create pthread %s\n", strerror(ret));
        return -1;
    }
    ret = pthread_create(&tpid3, NULL, consumer, (void *)&sum[1]);
    if (ret != 0)
    {
        printf("create pthread %s\n", strerror(ret));
        return -1;
    }
    
    
    //接收子线程退出
    pthread_join(tpid1, NULL);
    pthread_join(tpid2, NULL);
    pthread_join(tpid3, NULL);
    
    
#if MUTEXCOND
    pthread_mutex_destroy(&mutex);
    pthread_cond_destroy(&cond);

#endif
    
	return 0;
}

二,读写锁的使用

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include <pthread.h>


//读写锁
pthread_rwlock_t rwlock = PTHREAD_RWLOCK_INITIALIZER;

//生产者
void *producer(void *args)
{
	int id = *(int *)args;
	while (1)
	{
		//加读写锁
		pthread_rwlock_wrlock(&rwlock);

		printf("陈丽");
		int sum = 9;
		int q = sum;
		printf("杨艳\n");
		pthread_rwlock_unlock(&rwlock);
		usleep(100);
	}
	//退出子线程操作
	pthread_exit(NULL);
}

//消费者
void *consumer(void *args)
{
	int id = *(int *)args;
	while (1)
	{
		//加读写锁
		pthread_rwlock_rdlock(&rwlock);

		printf("re 王盼盼");
		int sum = 9;
		int q = sum;
		printf("王蓉\n");
		pthread_rwlock_unlock(&rwlock);
		usleep(100);
	}
	//子线程退出
	pthread_exit(NULL);
}
int main(int argc, char *argv[])
{
	//线程pid
	pthread_t tpid1, tpid2;


	//初始化读写锁
	pthread_rwlock_init(&rwlock, NULL);
	int ret;
	int sum[] = { 1, 2 };
	//创建线程
	ret = pthread_create(&tpid1, NULL, producer, (void *)&sum[1]); 
	if (ret != 0)
	{
		printf("create pthread 1: error:%s\n", strerror(ret));
		return -1;
	}
	ret = pthread_create(&tpid2, NULL, consumer, (void *)&sum[2]); 
	if (ret != 0)
	{
		printf("create pthread 2: error:%s\n", strerror(ret));
		return -1;
	}

	//等待子线程回收
	pthread_join(tpid1, NULL);
	pthread_join(tpid2, NULL);

	//销毁读写锁·
	pthread_rwlock_destroy(&rwlock);
	
	
	return 0;
}


三,信号亮

#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include <time.h>
#include <pthread.h>
#include <semaphore.h>


//数据
typedef struct node
{
	int data;
	struct node *next; //注意的
}NODE;


NODE *head = NULL;
//信号亮的使用
sem_t producer_sem;
sem_t consumer_sem;


//生产者
void *producer(void *args)
{
	NODE * pNode = NULL;
	while (1)
	{
		//添加生产
		sem_wait(&producer_sem);
		//添加数据
		pNode = (NODE *)malloc(sizeof(NODE));
		pNode->data = rand() % 1000;
		//改变指针的指向
		pNode->next = head;
		head = pNode;
		printf("producer:%d\n", head->data);
		sem_post(&consumer_sem);
		sleep(1);
	}
	//通知主线程
	pthread_exit(NULL);
}
//消费者
void *consumer(void *args)
{
	NODE * pNode = NULL;
	while (1)
	{
		//添加生产
		sem_wait(&consumer_sem);

		pNode = head->next;
		printf("consumer:%d\n", head->data); 
		head = pNode;	
		free(pNode);
		pNode = NULL;
		sem_post(&producer_sem);
		sleep(2);
	}
	//通知主线程
	pthread_exit(NULL);
}

int main(int argc, char *argv[])
{
	//子线程
	pthread_t tpid1, tpid2;
	
	//初始化信号亮
	sem_init(&producer_sem, 0, 5);
	sem_init(&consumer_sem, 0, 0);

	int ret;
	//创建子线程
	ret = pthread_create(&tpid1, NULL, producer, NULL);
	if (ret != 0)
	{
		printf("create pthread :%s\n", strerror(ret));
		return -1;
	}
	ret = pthread_create(&tpid2, NULL, consumer, NULL);
	if (ret != 0)
	{
		printf("create pthread2 :%s\n", strerror(ret));
		return -1;
	}


	//回收子线程
	pthread_join(tpid1, NULL);
	pthread_join(tpid2, NULL);


	// 销毁信号亮
	sem_destroy(&producer_sem);
	sem_destroy(&consumer_sem);


	return 0;
}

四, 设置线程的属性 

在mqtt中设置线程属性

// gcc -o phtread_cancel pthread_cancel_join.c -lpthread
// 设置线程是否退出的属性
void thread_function(void *arg)
{
    /**
    * 线程准备执行一些关键工作,在这个过程中不希望被取消。
    * 所以先通过pthread_setcancelstate()将本线程的cancel state
    * 设为disabled。
    */
    pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
    /* 执行关键工作 */
    //...
    /**
    * 关键工作执行完成,可以被取消。
    * 通过pthread_setcancelstate()将本线程的cancel state
    * 设为enabled。
    */
    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
    /**
    * 调用pthread_testcancel()函数,检查一下在cancel state
    * 为disabled状态的时候,是否有取消请求发送给本线程。
    * 如果有的话就取消(退出)。
    */
    pthread_testcancel();
    /**
    * pthread_testcancel()返回了,表明之前没有取消请求发送给本线程,
    * 继续其余的工作。
    * 这时候如果有取消请求发送给本线程,会在下一次执行到
    * cancellation point的时候(例如sleep(), read(), write(), ...)时取消。
    */
    //...
    /**
    * 从这里开始,函数里不再包含cancellation point了。
    * 如果收到取消请求,将无法取消。所以先把本线程的cancel type
    * 设为asynchronous,收到取消请求将立即取消。
    */
    pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
    /* 不包含cancellation point的代码 */
    ...
}
#include <pthread.h>
#include <stdio.h>
#include <errno.h>
#include <stdlib.h>
#include <unistd.h>

#define handle_error_en(en, msg) \
       do { errno = en; perror(msg); exit(EXIT_FAILURE); } while (0)

static void *
thread_func(void *ignored_argument)
{
   int s;

   /* Disable cancellation for a while, so that we don't
      immediately react to a cancellation request */

   s = pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
   if (s != 0)
       handle_error_en(s, "pthread_setcancelstate");

   printf("thread_func(): started; cancellation disabled\n");
   sleep(5);
   printf("thread_func(): about to enable cancellation\n");

   s = pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
   if (s != 0)
       handle_error_en(s, "pthread_setcancelstate");

   /* sleep() is a cancellation point */

   sleep(1000);        /* Should get canceled while we sleep */

   /* Should never get here */

   printf("thread_func(): not canceled!\n");
   return NULL;
}


int main(int argc, char *argv[])
{
   pthread_t thr;
   void *res;
   int s;

   /* Start a thread and then send it a cancellation request */

   s = pthread_create(&thr, NULL, &thread_func, NULL);
   if (s != 0)
       handle_error_en(s, "pthread_create");

   sleep(2);           /* Give thread a chance to get started */

   printf("main(): sending cancellation request\n");
   s = pthread_cancel(thr);
   if (s != 0)
       handle_error_en(s, "pthread_cancel");

   /* Join with thread to see what its exit status was */

   s = pthread_join(thr, &res);
   if (s != 0)
       handle_error_en(s, "pthread_join");

   if (res == PTHREAD_CANCELED)
       printf("main(): thread was canceled\n");
   else
       printf("main(): thread wasn't canceled (shouldn't happen!)\n");
   exit(EXIT_SUCCESS);
}



五, boost的thread 的学习

1, 分析 boost 中条件变量 信号亮

boost/thread/condition_variable.hpp

中有区分平台的

我熟悉linux的api 所以我看boost对linux的pthread库的封装

boost/thread/pthread/目录下面是linux平台

了解 condition_variable 的使用

condition_variable这个封装 linux C中的

  1. pthread_mutex_init
  2. pthread_cond_init
  3. pthread_cond_wait
  4. pthread_cond_timedwait
  5. pthread_cond_signal
  6. pthread_cond_broadcast
  7. pthread_mutex_destroy
  8. pthread_cond_destroy

正常我们地使用 condition_variable

int sum = 0;
boost::condition_variable m_cond;
boost::mutex m_mutex;
boost::uniquie<boost::mutex> lock{m_mutex};

m_cond.wait(lock, []() { return sum > 34 ? false:true; });

查看boost中wait的实现

  template<typename lock_type>
        void wait(lock_type& m)
        {
            int res=0;
            {
                thread_cv_detail::lock_on_exit<lock_type> guard;
#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS
                detail::interruption_checker check_for_interruption(&internal_mutex,&cond);
#else
                boost::pthread::pthread_mutex_scoped_lock check_for_interruption(&internal_mutex);
#endif
                guard.activate(m);
                res=pthread_cond_wait(&cond,&internal_mutex);  //--- 最终调用 pthread_cond_wait函数
                check_for_interruption.unlock_if_locked();
                guard.deactivate();
            }
#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS
            this_thread::interruption_point();
#endif
            if(res)
            {
                boost::throw_exception(condition_error(res, "boost::condition_variable_any::wait() failed in pthread_cond_wait"));
            }
        }

        template<typename lock_type,typename predicate_type>
        void wait(lock_type& m,predicate_type pred)  // 我们可以借鉴 boost中 C++11 中高级模块的使用Lambda函数与表示式 的这个使用方法
        {
            while (!pred())  // 判断条件 当什么时候条件 满足就不调用了  
            {
                wait(m);
            }
        }

2, boost库中thread

找主要的接口

  1. pthread_create
  2. pthread_join
  3. pthread_detach

在boost库中你会发现thread中的start_thread函数的在boost/asio目录中的实现的

boost/asio/detail/ 和boost/asio/detail/impl目录下

boost/asio/detail/posix_thread.hpp 接口的定义

class posix_thread
  : private noncopyable
{
public:
  // Constructor.
  template <typename Function>
  posix_thread(Function f, unsigned int = 0)
    : joined_(false)
  {
    start_thread(new func<Function>(f));
  }

  // Destructor.
  BOOST_ASIO_DECL ~posix_thread();

  // Wait for the thread to exit.
  BOOST_ASIO_DECL void join();

  // Get number of CPUs.
  BOOST_ASIO_DECL static std::size_t hardware_concurrency();

private:
  friend void* boost_asio_detail_posix_thread_function(void* arg);

  class func_base
  {
  public:
    virtual ~func_base() {}
    virtual void run() = 0;
  };

  struct auto_func_base_ptr
  {
    func_base* ptr;
    ~auto_func_base_ptr() { delete ptr; }
  };

  template <typename Function>
  class func
    : public func_base
  {
  public:
    func(Function f)
      : f_(f)
    {
    }

    virtual void run()
    {
      f_();
    }

  private:
    Function f_;
  };

  BOOST_ASIO_DECL void start_thread(func_base* arg);

  ::pthread_t thread_;
  bool joined_;
};

boost/asio/detail/impl/posix_thread.ipp接口的实现start_thread 中调用pthread_create


posix_thread::~posix_thread()
{
  if (!joined_)
    ::pthread_detach(thread_);
}

void posix_thread::join()
{
  if (!joined_)
  {
    ::pthread_join(thread_, 0);
    joined_ = true;
  }
}

std::size_t posix_thread::hardware_concurrency()
{
#if defined(_SC_NPROCESSORS_ONLN)
  long result = sysconf(_SC_NPROCESSORS_ONLN);
  if (result > 0)
    return result;
#endif // defined(_SC_NPROCESSORS_ONLN)
  return 0;
}

void posix_thread::start_thread(func_base* arg)
{
  int error = ::pthread_create(&thread_, 0,
        boost_asio_detail_posix_thread_function, arg);
  if (error != 0)
  {
    delete arg;
    boost::system::error_code ec(error,
        boost::asio::error::get_system_category());
    boost::asio::detail::throw_error(ec, "thread");
  }
}

void* boost_asio_detail_posix_thread_function(void* arg)
{
  posix_thread::auto_func_base_ptr func = {
      static_cast<posix_thread::func_base*>(arg) };
  func.ptr->run();
  return 0;
}

3, 库boost 封装thread分析

C++模板自动推导类型 和 {}

定义:

{} 是不检查指针安全的, 只是返回首地址

// g++ -o cpthread cthread.cpp -lpthread -std=c++11 -g -Wall
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <iostream>
#include <cstddef>

pthread_t tid1;

void* callback()
{
	printf("[%s][%d]\n", __FUNCTION__, __LINE__);
	return NULL;
}

void* thread_function(void* arg);

class func_base
{
public:
  virtual ~func_base() {}
  virtual void run() = 0;
};

struct auto_func_base_ptr
{
  func_base* ptr;
  ~auto_func_base_ptr() { delete ptr; }
};

template <typename Function>
class func
  : public func_base
{
public:
  func(Function f)
    : f_(f)
  {
  }

  virtual void run()
  {
    f_();
  }

private:
  Function f_;
};

void* thread_function(void* arg)
{
  auto_func_base_ptr func = {
      static_cast<func_base*>(arg) }; // 现在我还没有明白为什么要{}, 没有的话就报转换类型错误 
  func.ptr->run();
  return 0;
}

void start_thread(func_base *arg)
{
	pthread_create(&tid1,NULL,
	thread_function, arg);
}

template <typename Function>
void	chen_pthread(Function f)
{
	start_thread(new func<Function>(f));
}

int main(int argc, char *argv[])
{
	
	//void start_thread(func_base* arg);

	chen_pthread(callback);
	
	printf("开启线程");
    
	void *rval;
    pthread_join(tid1,(void **)&rval);
	
    printf("线程结束%d\n", rval);

	return 0;
}

看一下 代码中使用 {}

 auto_func_base_ptr func = {
      static_cast<func_base*>(arg) }; // 现在我还没有明白为什么要{}, 没有的话就报转换类型错误 
 

上一次分析错误了, 没有分析到位

结构体 sizeof(auto_func_base_ptr) == 8 个字节

sizeof(func_base) == 8 个字节

使用{} 是不检查指针安全的, 只是返回首地址

赋指针地址 到变量auto_func_base_ptr上栈的变量

栈上的变量 赋值 和指针变量赋值 使用 {}

结语

看别人的库时, 要有一些基本知识.