前言
基本功的修炼 是看第三方库基础
正文
一,互斥锁和添加变量的使用
- pthread_mutex_lock()
- phread_mutex_unlock()
- pthread_mutex_init();
- pthread_mutex_destroy()
案例一:
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
// 产品队列
int ready=0;
//互斥锁
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
//条件变量
pthread_cond_t has_product = PTHREAD_COND_INITIALIZER;
//生产者
void * produce(void *arg){
char* name=(char*)arg;
for(;;){
//锁住
pthread_mutex_lock(&mutex);
ready++;
printf("生产者 生产 %d \n",ready);
//发送一条解锁消息
pthread_cond_signal(&has_product);
//解锁
pthread_mutex_unlock(&mutex);
sleep(3);
}
}
//消费者
void* consumer(void* arg){
char* name=(char*)arg;
for(;;){
pthread_mutex_lock(&mutex);
//如果产品列表为空 继续等待不可能只有1个消费者
while(ready==0){
printf("没有产品了");
pthread_cond_wait(&has_product,&mutex);
}
//加锁
ready--;
printf("消费者 %s 消费%d\n",name,ready);
pthread_mutex_unlock(&mutex);
sleep(1);
}
}
void main(){
pthread_mutex_init(&mutex,NULL);
pthread_cond_init(&has_product,NULL);
//线程的引用
pthread_t tid1,tid2,tid3;
pthread_create(&tid1,NULL,produce,"生产者");
pthread_create(&tid2,NULL,consumer,"消费者1");
pthread_create(&tid3,NULL,consumer,"消费者2");
printf("开启线程");
void *rval;
pthread_join(tid1,&rval);
pthread_join(tid2,&rval);
pthread_join(tid3,&rval);
printf("线程结束%d\n",rval);
}
案例二:
//g++ -o cpthread cpthread.cpp -lpthread
/*************************************************************************
> File Name: pthread_cond_mutex.c
> Author: songli
> QQ: 2734030745
> Mail: 15850774503@163.com
> CSDN: http://my.csdn.net/Poisx
> github: https://github.com/chensongpoixs
> Created Time: Wed 25 Oct 2017 11:18:19 PM CST
************************************************************************/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/times.h>
#include <sys/wait.h>
#include <unistd.h>
#include <pthread.h>
#include <time.h>
#define MUTEXCOND 1
typedef struct node
{
int data;
struct node *next;
}NODE;
//栈区的全局变量
NODE *head = NULL;
#if MUTEXCOND
//互斥锁
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
//条件变量
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
#endif
//生产者
void *producer(void *args)
{
NODE *pNode = NULL;
while (1)
{
#if MUTEXCOND
//加锁
pthread_mutex_lock(&mutex);
#endif
//开辟空间
pNode = (NODE *)malloc(sizeof(NODE));
pNode->data = rand() % 5000; //随机数
printf("product :%d\n", pNode->data);
//改变头结点
pNode->next = head;
head = pNode;
#if MUTEXCOND
//解锁
pthread_mutex_unlock(&mutex);
//通知消费者进程
pthread_cond_signal(&cond);
#endif
sleep(1);
}
//子线程退出通知主线程
pthread_exit(NULL);
}
//消费者
void *consumer(void *args)
{
int id = *(int *)args;
NODE *pNode = NULL;
for (;;)
{
#if MUTEXCOND
//加锁
pthread_mutex_lock(&mutex);
if (head == NULL)
{
pthread_cond_wait(&cond, &mutex);
}
#endif
//接收数据
pNode = head;
head = pNode->next;
printf("consumer id %d, :%d\n", id, pNode->data);
//释放内存
free(pNode);
pNode = NULL;
#if MUTEXCOND
//解锁
pthread_mutex_unlock(&mutex);
#endif
sleep(2);
}
//线程退出通知主线程
pthread_exit(NULL);
}
int main(int argc, char *argv[])
{
//子线程id
pthread_t tpid1, tpid2, tpid3;
#if MUTEXCOND
//初始化 互斥锁
pthread_mutex_init(&mutex, NULL);
#endif
int ret;
int sum[2] = { 1, 2 };
//创建子线程
ret = pthread_create(&tpid1, NULL, producer, NULL);
if (ret != 0)
{
printf("create pthread %s\n", strerror(ret));
return -1;
}
ret = pthread_create(&tpid2, NULL, consumer, (void *)&sum[0]);
if (ret != 0)
{
printf("create pthread %s\n", strerror(ret));
return -1;
}
ret = pthread_create(&tpid3, NULL, consumer, (void *)&sum[1]);
if (ret != 0)
{
printf("create pthread %s\n", strerror(ret));
return -1;
}
//接收子线程退出
pthread_join(tpid1, NULL);
pthread_join(tpid2, NULL);
pthread_join(tpid3, NULL);
#if MUTEXCOND
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond);
#endif
return 0;
}
二,读写锁的使用
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include <pthread.h>
//读写锁
pthread_rwlock_t rwlock = PTHREAD_RWLOCK_INITIALIZER;
//生产者
void *producer(void *args)
{
int id = *(int *)args;
while (1)
{
//加读写锁
pthread_rwlock_wrlock(&rwlock);
printf("陈丽");
int sum = 9;
int q = sum;
printf("杨艳\n");
pthread_rwlock_unlock(&rwlock);
usleep(100);
}
//退出子线程操作
pthread_exit(NULL);
}
//消费者
void *consumer(void *args)
{
int id = *(int *)args;
while (1)
{
//加读写锁
pthread_rwlock_rdlock(&rwlock);
printf("re 王盼盼");
int sum = 9;
int q = sum;
printf("王蓉\n");
pthread_rwlock_unlock(&rwlock);
usleep(100);
}
//子线程退出
pthread_exit(NULL);
}
int main(int argc, char *argv[])
{
//线程pid
pthread_t tpid1, tpid2;
//初始化读写锁
pthread_rwlock_init(&rwlock, NULL);
int ret;
int sum[] = { 1, 2 };
//创建线程
ret = pthread_create(&tpid1, NULL, producer, (void *)&sum[1]);
if (ret != 0)
{
printf("create pthread 1: error:%s\n", strerror(ret));
return -1;
}
ret = pthread_create(&tpid2, NULL, consumer, (void *)&sum[2]);
if (ret != 0)
{
printf("create pthread 2: error:%s\n", strerror(ret));
return -1;
}
//等待子线程回收
pthread_join(tpid1, NULL);
pthread_join(tpid2, NULL);
//销毁读写锁·
pthread_rwlock_destroy(&rwlock);
return 0;
}
三,信号亮
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include <time.h>
#include <pthread.h>
#include <semaphore.h>
//数据
typedef struct node
{
int data;
struct node *next; //注意的
}NODE;
NODE *head = NULL;
//信号亮的使用
sem_t producer_sem;
sem_t consumer_sem;
//生产者
void *producer(void *args)
{
NODE * pNode = NULL;
while (1)
{
//添加生产
sem_wait(&producer_sem);
//添加数据
pNode = (NODE *)malloc(sizeof(NODE));
pNode->data = rand() % 1000;
//改变指针的指向
pNode->next = head;
head = pNode;
printf("producer:%d\n", head->data);
sem_post(&consumer_sem);
sleep(1);
}
//通知主线程
pthread_exit(NULL);
}
//消费者
void *consumer(void *args)
{
NODE * pNode = NULL;
while (1)
{
//添加生产
sem_wait(&consumer_sem);
pNode = head->next;
printf("consumer:%d\n", head->data);
head = pNode;
free(pNode);
pNode = NULL;
sem_post(&producer_sem);
sleep(2);
}
//通知主线程
pthread_exit(NULL);
}
int main(int argc, char *argv[])
{
//子线程
pthread_t tpid1, tpid2;
//初始化信号亮
sem_init(&producer_sem, 0, 5);
sem_init(&consumer_sem, 0, 0);
int ret;
//创建子线程
ret = pthread_create(&tpid1, NULL, producer, NULL);
if (ret != 0)
{
printf("create pthread :%s\n", strerror(ret));
return -1;
}
ret = pthread_create(&tpid2, NULL, consumer, NULL);
if (ret != 0)
{
printf("create pthread2 :%s\n", strerror(ret));
return -1;
}
//回收子线程
pthread_join(tpid1, NULL);
pthread_join(tpid2, NULL);
// 销毁信号亮
sem_destroy(&producer_sem);
sem_destroy(&consumer_sem);
return 0;
}
四, 设置线程的属性
在mqtt中设置线程属性
// gcc -o phtread_cancel pthread_cancel_join.c -lpthread
// 设置线程是否退出的属性
void thread_function(void *arg)
{
/**
* 线程准备执行一些关键工作,在这个过程中不希望被取消。
* 所以先通过pthread_setcancelstate()将本线程的cancel state
* 设为disabled。
*/
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
/* 执行关键工作 */
//...
/**
* 关键工作执行完成,可以被取消。
* 通过pthread_setcancelstate()将本线程的cancel state
* 设为enabled。
*/
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
/**
* 调用pthread_testcancel()函数,检查一下在cancel state
* 为disabled状态的时候,是否有取消请求发送给本线程。
* 如果有的话就取消(退出)。
*/
pthread_testcancel();
/**
* pthread_testcancel()返回了,表明之前没有取消请求发送给本线程,
* 继续其余的工作。
* 这时候如果有取消请求发送给本线程,会在下一次执行到
* cancellation point的时候(例如sleep(), read(), write(), ...)时取消。
*/
//...
/**
* 从这里开始,函数里不再包含cancellation point了。
* 如果收到取消请求,将无法取消。所以先把本线程的cancel type
* 设为asynchronous,收到取消请求将立即取消。
*/
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
/* 不包含cancellation point的代码 */
...
}
#include <pthread.h>
#include <stdio.h>
#include <errno.h>
#include <stdlib.h>
#include <unistd.h>
#define handle_error_en(en, msg) \
do { errno = en; perror(msg); exit(EXIT_FAILURE); } while (0)
static void *
thread_func(void *ignored_argument)
{
int s;
/* Disable cancellation for a while, so that we don't
immediately react to a cancellation request */
s = pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
if (s != 0)
handle_error_en(s, "pthread_setcancelstate");
printf("thread_func(): started; cancellation disabled\n");
sleep(5);
printf("thread_func(): about to enable cancellation\n");
s = pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
if (s != 0)
handle_error_en(s, "pthread_setcancelstate");
/* sleep() is a cancellation point */
sleep(1000); /* Should get canceled while we sleep */
/* Should never get here */
printf("thread_func(): not canceled!\n");
return NULL;
}
int main(int argc, char *argv[])
{
pthread_t thr;
void *res;
int s;
/* Start a thread and then send it a cancellation request */
s = pthread_create(&thr, NULL, &thread_func, NULL);
if (s != 0)
handle_error_en(s, "pthread_create");
sleep(2); /* Give thread a chance to get started */
printf("main(): sending cancellation request\n");
s = pthread_cancel(thr);
if (s != 0)
handle_error_en(s, "pthread_cancel");
/* Join with thread to see what its exit status was */
s = pthread_join(thr, &res);
if (s != 0)
handle_error_en(s, "pthread_join");
if (res == PTHREAD_CANCELED)
printf("main(): thread was canceled\n");
else
printf("main(): thread wasn't canceled (shouldn't happen!)\n");
exit(EXIT_SUCCESS);
}
五, boost的thread 的学习
1, 分析 boost 中条件变量 信号亮
boost/thread/condition_variable.hpp
中有区分平台的
我熟悉linux的api 所以我看boost对linux的pthread库的封装
boost/thread/pthread/目录下面是linux平台
了解 condition_variable 的使用
condition_variable这个封装 linux C中的
- pthread_mutex_init
- pthread_cond_init
- pthread_cond_wait
- pthread_cond_timedwait
- pthread_cond_signal
- pthread_cond_broadcast
- pthread_mutex_destroy
- pthread_cond_destroy
正常我们地使用 condition_variable
int sum = 0;
boost::condition_variable m_cond;
boost::mutex m_mutex;
boost::uniquie<boost::mutex> lock{m_mutex};
m_cond.wait(lock, []() { return sum > 34 ? false:true; });
查看boost中wait的实现
template<typename lock_type>
void wait(lock_type& m)
{
int res=0;
{
thread_cv_detail::lock_on_exit<lock_type> guard;
#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS
detail::interruption_checker check_for_interruption(&internal_mutex,&cond);
#else
boost::pthread::pthread_mutex_scoped_lock check_for_interruption(&internal_mutex);
#endif
guard.activate(m);
res=pthread_cond_wait(&cond,&internal_mutex); //--- 最终调用 pthread_cond_wait函数
check_for_interruption.unlock_if_locked();
guard.deactivate();
}
#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS
this_thread::interruption_point();
#endif
if(res)
{
boost::throw_exception(condition_error(res, "boost::condition_variable_any::wait() failed in pthread_cond_wait"));
}
}
template<typename lock_type,typename predicate_type>
void wait(lock_type& m,predicate_type pred) // 我们可以借鉴 boost中 C++11 中高级模块的使用Lambda函数与表示式 的这个使用方法
{
while (!pred()) // 判断条件 当什么时候条件 满足就不调用了
{
wait(m);
}
}
2, boost库中thread
找主要的接口
- pthread_create
- pthread_join
- pthread_detach
在boost库中你会发现thread中的start_thread函数的在boost/asio目录中的实现的
boost/asio/detail/ 和boost/asio/detail/impl目录下
boost/asio/detail/posix_thread.hpp 接口的定义
class posix_thread
: private noncopyable
{
public:
// Constructor.
template <typename Function>
posix_thread(Function f, unsigned int = 0)
: joined_(false)
{
start_thread(new func<Function>(f));
}
// Destructor.
BOOST_ASIO_DECL ~posix_thread();
// Wait for the thread to exit.
BOOST_ASIO_DECL void join();
// Get number of CPUs.
BOOST_ASIO_DECL static std::size_t hardware_concurrency();
private:
friend void* boost_asio_detail_posix_thread_function(void* arg);
class func_base
{
public:
virtual ~func_base() {}
virtual void run() = 0;
};
struct auto_func_base_ptr
{
func_base* ptr;
~auto_func_base_ptr() { delete ptr; }
};
template <typename Function>
class func
: public func_base
{
public:
func(Function f)
: f_(f)
{
}
virtual void run()
{
f_();
}
private:
Function f_;
};
BOOST_ASIO_DECL void start_thread(func_base* arg);
::pthread_t thread_;
bool joined_;
};
boost/asio/detail/impl/posix_thread.ipp接口的实现start_thread 中调用pthread_create
posix_thread::~posix_thread()
{
if (!joined_)
::pthread_detach(thread_);
}
void posix_thread::join()
{
if (!joined_)
{
::pthread_join(thread_, 0);
joined_ = true;
}
}
std::size_t posix_thread::hardware_concurrency()
{
#if defined(_SC_NPROCESSORS_ONLN)
long result = sysconf(_SC_NPROCESSORS_ONLN);
if (result > 0)
return result;
#endif // defined(_SC_NPROCESSORS_ONLN)
return 0;
}
void posix_thread::start_thread(func_base* arg)
{
int error = ::pthread_create(&thread_, 0,
boost_asio_detail_posix_thread_function, arg);
if (error != 0)
{
delete arg;
boost::system::error_code ec(error,
boost::asio::error::get_system_category());
boost::asio::detail::throw_error(ec, "thread");
}
}
void* boost_asio_detail_posix_thread_function(void* arg)
{
posix_thread::auto_func_base_ptr func = {
static_cast<posix_thread::func_base*>(arg) };
func.ptr->run();
return 0;
}
3, 库boost 封装thread分析
C++模板自动推导类型 和 {}
定义:
{} 是不检查指针安全的, 只是返回首地址
// g++ -o cpthread cthread.cpp -lpthread -std=c++11 -g -Wall
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <iostream>
#include <cstddef>
pthread_t tid1;
void* callback()
{
printf("[%s][%d]\n", __FUNCTION__, __LINE__);
return NULL;
}
void* thread_function(void* arg);
class func_base
{
public:
virtual ~func_base() {}
virtual void run() = 0;
};
struct auto_func_base_ptr
{
func_base* ptr;
~auto_func_base_ptr() { delete ptr; }
};
template <typename Function>
class func
: public func_base
{
public:
func(Function f)
: f_(f)
{
}
virtual void run()
{
f_();
}
private:
Function f_;
};
void* thread_function(void* arg)
{
auto_func_base_ptr func = {
static_cast<func_base*>(arg) }; // 现在我还没有明白为什么要{}, 没有的话就报转换类型错误
func.ptr->run();
return 0;
}
void start_thread(func_base *arg)
{
pthread_create(&tid1,NULL,
thread_function, arg);
}
template <typename Function>
void chen_pthread(Function f)
{
start_thread(new func<Function>(f));
}
int main(int argc, char *argv[])
{
//void start_thread(func_base* arg);
chen_pthread(callback);
printf("开启线程");
void *rval;
pthread_join(tid1,(void **)&rval);
printf("线程结束%d\n", rval);
return 0;
}
看一下 代码中使用 {}
auto_func_base_ptr func = {
static_cast<func_base*>(arg) }; // 现在我还没有明白为什么要{}, 没有的话就报转换类型错误
上一次分析错误了, 没有分析到位
结构体 sizeof(auto_func_base_ptr) == 8 个字节
sizeof(func_base) == 8 个字节
使用{} 是不检查指针安全的, 只是返回首地址
赋指针地址 到变量auto_func_base_ptr上栈的变量
栈上的变量 赋值 和指针变量赋值 使用 {}
结语
看别人的库时, 要有一些基本知识.