更多课程 选择中心

C/C++培训
达内IT学院

400-996-5531

C++开发-条件变量整理

  • 发布:C++培训
  • 来源:网络
  • 时间:2019-01-15 13:11

后台C++开发你一定要知道的条件变量

今天因为工作需要,需要帮同事用C语言(不是C++)写一个生产者消费者的任务队列工具库,考虑到不能使用任何第三库和C++的任何特性,所以我将任务队列做成一个链表,生产者在队列尾部加入任务,消费者在队列头部取出任务。很快就写好了,代码如下:

/**

* 线程池工具, ctrip_thread_pool.h

* zhangyl 2018.03.23

*/

#ifndef __CTRIP_THREAD_POOL_H__

#define __CTRIP_THREAD_POOL_H__

#include <pthread.h>

#ifndef NULL

#define NULL 0

#endif

#define PUBLIC

PUBLIC struct ctrip_task

{

struct ctrip_task* pNext;

int value;

};

struct ctrip_thread_info

{

//线程退出标志

int thread_running;

int thread_num;

int tasknum;

struct ctrip_task* tasks;

pthread_t* threadid;

pthread_mutex_t mutex;

pthread_cond_t cond;

};

/* 初始化线程池线程数目

* @param thread_num 线程数目, 默认为8个

*/

PUBLIC void ctrip_init_thread_pool(int thread_num);

/* 销毁线程池

*/

PUBLIC void ctrip_destroy_thread_pool();

/**向任务池中增加一个任务

* @param t 需要增加的任务

*/

PUBLIC void ctrip_thread_pool_add_task(struct ctrip_task* t);

/**从任务池中取出一个任务

* @return 返回得到的任务

*/

struct ctrip_task* ctrip_thread_pool_retrieve_task();

/**执行任务池中的任务

* @param t 需要执行的任务

*/

PUBLIC void ctrip_thread_pool_do_task(struct ctrip_task* t);

/**线程函数

* @param thread_param 线程参数

*/

void* ctrip_thread_routine(void* thread_param);

#endif //!__CTRIP_THREAD_POOL_H__

/**

* 线程池工具, ctrip_thread_pool.c

* zhangyl 2018.03.23

*/

#include "ctrip_thread_pool.h"

#include <stdio.h>

#include <stdlib.h>

struct ctrip_thread_info g_threadinfo;

int thread_running = 0;

void ctrip_init_thread_pool(int thread_num)

{

if (thread_num <= 0)

thread_num = 5;

pthread_mutex_init(&g_threadinfo.mutex, NULL);

pthread_cond_init(&g_threadinfo.cond, NULL);

g_threadinfo.thread_num = thread_num;

g_threadinfo.thread_running = 1;

g_threadinfo.tasknum = 0;

g_threadinfo.tasks = NULL;

thread_running = 1;

g_threadinfo.threadid = (pthread_t*)malloc(sizeof(pthread_t) * thread_num);

int i;

for (i = 0; i < thread_num; ++i)

{

pthread_create(&g_threadinfo.threadid[i], NULL, ctrip_thread_routine, NULL);

}

}

void ctrip_destroy_thread_pool()

{

g_threadinfo.thread_running = 0;

thread_running = 0;

pthread_cond_broadcast(&g_threadinfo.cond);

int i;

for (i = 0; i < g_threadinfo.thread_num; ++i)

{

pthread_join(g_threadinfo.threadid[i], NULL);

}

free(g_threadinfo.threadid);

pthread_mutex_destroy(&g_threadinfo.mutex);

pthread_cond_destroy(&g_threadinfo.cond);

}

void ctrip_thread_pool_add_task(struct ctrip_task* t)

{

if (t == NULL)

return;

pthread_mutex_lock(&g_threadinfo.mutex);

struct ctrip_task* head = g_threadinfo.tasks;

if (head == NULL)

g_threadinfo.tasks = t;

else

{

while (head->pNext != NULL)

{

head = head->pNext;

}

head->pNext = t;

}

++g_threadinfo.tasknum;

//当有变化后,使用signal通知wait函数

pthread_cond_signal(&g_threadinfo.cond);

pthread_mutex_unlock(&g_threadinfo.mutex);

}

struct ctrip_task* ctrip_thread_pool_retrieve_task()

{

struct ctrip_task* head = g_threadinfo.tasks;

if (head != NULL)

{

g_threadinfo.tasks = head->pNext;

--g_threadinfo.tasknum;

printf("retrieve a task, task value is [%d]\n", head->value);

return head;

}

printf("no task\n");

return NULL;

}

void* ctrip_thread_routine(void* thread_param)

{

printf("thread NO.%d start.\n", (int)pthread_self());

while (thread_running/*g_threadinfo.thread_running*/)

{

struct ctrip_task* current = NULL;

pthread_mutex_lock(&g_threadinfo.mutex);

while (g_threadinfo.tasknum <= 0)

{

//如果获得了互斥锁,但是条件不合适的话,wait会释放锁,不往下执行。

//当变化后,条件合适,将直接获得锁。

pthread_cond_wait(&g_threadinfo.cond, &g_threadinfo.mutex);

if (!g_threadinfo.thread_running)

break;

current = ctrip_thread_pool_retrieve_task();

if (current != NULL)

break;

}// end inner-while-loop

pthread_mutex_unlock(&g_threadinfo.mutex);

ctrip_thread_pool_do_task(current);

}// end outer-while-loop

printf("thread NO.%d exit.\n", (int)pthread_self());

}

void ctrip_thread_pool_do_task(struct ctrip_task* t)

{

if (t == NULL)

return;

//TODO: do your work here

printf("task value is [%d]\n", t->value);

//TODO:如果t需要释放,记得在这里释放

}

测试代码如下:

// ctrip_thread_pool.cpp : Defines the entry point for the console application.

//

//#include "stdafx.h"

#include "ctrip_thread_pool.h"

#include <stdlib.h>

#include <unistd.h>

int main(int argc, char* argv[])

{

ctrip_init_thread_pool(5);

struct ctrip_task* task = NULL;

int i;

for (i = 0; i < 100; ++i)

{

task = (struct ctrip_task*)malloc(sizeof(struct ctrip_task));

task->value = i + 1;

task->pNext = NULL;

printf("add task, task value [%d]\n", task->value);

ctrip_thread_pool_add_task(task);

}

sleep(10);

ctrip_destroy_thread_pool();

return 0;

}

代码很快就写好了,但是每次程序只能执行前几个加到任务池子里面的任务,导致池子有不少任务积累在池子里面。甚是奇怪,我也看了半天才看出结果。聪明的你,能看出上述代码为啥只能执行加到池子里面的前几个任务?先不要看答案,自己想一会儿。

linux条件变量是做后台开发必须熟练掌握的基础知识,而条件变量使用存在以下几个非常让人迷惑的地方,讲解如下

第一、必须要结合一个互斥体一起使用。使用流程如下:

pthread_mutex_lock(&g_threadinfo.mutex)

pthread_cond_wait(&g_threadinfo.cond, &g_threadinfo.mutex);

pthread_mutex_unlock(&g_threadinfo.mutex);

上面的代码,我们分为一二三步,当条件不满足是pthread_cond_wait会挂起线程,但是不知道你有没有注意到,如果在第二步挂起线程的话,第一步的mutex已经被上锁,谁来解锁?mutex的使用原则是谁上锁谁解锁,所以不可能在其他线程来给这个mutex解锁,但是这个线程已经挂起了,这就死锁了。所以pthread_cond_wait在挂起之前,额外做的一个事情就是给绑定的mutex解锁。反过来,如果条件满足,pthread_cond_wait不挂起线程,pthread_cond_wait将什么也不做,这样就接着走pthread_mutex_unlock解锁的流程。而在这个加锁和解锁之间的代码就是我们操作受保护资源的地方。

第二,不知道你有没有注意到pthread_cond_wait是放在一个while循环里面的:

pthread_mutex_lock(&g_threadinfo.mutex);

while (g_threadinfo.tasknum <= 0)

{

//如果获得了互斥锁,但是条件不合适的话,wait会释放锁,不往下执行。

//当变化后,条件合适,将直接获得锁。

pthread_cond_wait(&g_threadinfo.cond, &g_threadinfo.mutex);

if (!g_threadinfo.thread_running)

break;

current = ctrip_thread_pool_retrieve_task();

if (current != NULL)

break;

}// end inner-while-loop

pthread_mutex_unlock(&g_threadinfo.mutex);

注意,我说的是内层的while循环,不是外层的。pthread_cond_wait一定要放在一个while循环里面吗?一定要的。这里有一个非常重要的关于条件变量的基础知识,叫条件变量的虚假唤醒(spurious wakeup),那啥叫条件变量的虚假唤醒呢?假设pthread_cond_wait不放在这个while循环里面,正常情况下,pthread_cond_wait因为条件不满足,挂起线程。然后,外部条件满足以后,调用pthread_cond_signal或pthread_cond_broadcast来唤醒挂起的线程。这没啥问题。但是条件变量可能在某些情况下也被唤醒,这个时候pthread_cond_wait处继续往下执行,但是这个时候,条件并不满足(比如任务队列中仍然为空)。这种唤醒我们叫“虚假唤醒”。为了避免虚假唤醒时,做无意义的动作,我们将pthread_cond_wait放到while循环条件中,这样即使被虚假唤醒了,由于while条件(比如任务队列是否为空,资源数量是否大于0)仍然为true,导致线程进行继续挂起。有人说条件变量是最不可能用错的线程之间同步技术,我却觉得这是最容易使用错误的线程之间同步技术。

上述代码存在的问题是,只考虑了任务队列开始为空,生产者后来添加了任务,条件变量被唤醒,然后消费者取任务执行的逻辑。假如一开始池中就有任务呢?这个原因导致,只有开始的几个添加到任务队列中任务被执行。因为一旦任务队列不为空。内层while循环条件将不再满足,导致消费者线程不再从任务队列中取任务消费。正确的代码如下:

/**

* 线程池工具, ctrip_thread_pool.c(修正后的代码)

* zhangyl 2018.03.23

*/

#include "ctrip_thread_pool.h"

#include <stdio.h>

#include <stdlib.h>

struct ctrip_thread_info g_threadinfo;

void ctrip_init_thread_pool(int thread_num)

{

if (thread_num <= 0)

thread_num = 5;

pthread_mutex_init(&g_threadinfo.mutex, NULL);

pthread_cond_init(&g_threadinfo.cond, NULL);

g_threadinfo.thread_num = thread_num;

g_threadinfo.thread_running = 1;

g_threadinfo.tasknum = 0;

g_threadinfo.tasks = NULL;

g_threadinfo.threadid = (pthread_t*)malloc(sizeof(pthread_t) * thread_num);

int i;

for (i = 0; i < thread_num; ++i)

{

pthread_create(&g_threadinfo.threadid[i], NULL, ctrip_thread_routine, NULL);

}

}

void ctrip_destroy_thread_pool()

{

g_threadinfo.thread_running = 0;

pthread_cond_broadcast(&g_threadinfo.cond);

int i;

for (i = 0; i < g_threadinfo.thread_num; ++i)

{

pthread_join(g_threadinfo.threadid[i], NULL);

}

free(g_threadinfo.threadid);

pthread_mutex_destroy(&g_threadinfo.mutex);

pthread_cond_destroy(&g_threadinfo.cond);

}

void ctrip_thread_pool_add_task(struct ctrip_task* t)

{

if (t == NULL)

return;

pthread_mutex_lock(&g_threadinfo.mutex);

struct ctrip_task* head = g_threadinfo.tasks;

if (head == NULL)

g_threadinfo.tasks = t;

else

{

while (head->pNext != NULL)

{

head = head->pNext;

}

head->pNext = t;

}

++g_threadinfo.tasknum;

//当有变化后,使用signal通知wait函数

pthread_cond_signal(&g_threadinfo.cond);

pthread_mutex_unlock(&g_threadinfo.mutex);

}

struct ctrip_task* ctrip_thread_pool_retrieve_task()

{

struct ctrip_task* head = g_threadinfo.tasks;

if (head != NULL)

{

g_threadinfo.tasks = head->pNext;

--g_threadinfo.tasknum;

printf("retrieve a task, task value is [%d]\n", head->value);

return head;

}

printf("no task\n");

return NULL;

}

void* ctrip_thread_routine(void* thread_param)

{

printf("thread NO.%d start.\n", (int)pthread_self());

while (g_threadinfo.thread_running)

{

struct ctrip_task* current = NULL;

pthread_mutex_lock(&g_threadinfo.mutex);

while (g_threadinfo.tasknum <= 0)

{

//如果获得了互斥锁,但是条件不合适的话,wait会释放锁,不往下执行。

//当变化后,条件合适,将直接获得锁。

pthread_cond_wait(&g_threadinfo.cond, &g_threadinfo.mutex);

if (!g_threadinfo.thread_running)

break;

}// end inner-while-loop

current = ctrip_thread_pool_retrieve_task();

pthread_mutex_unlock(&g_threadinfo.mutex);

ctrip_thread_pool_do_task(current);

}// end outer-while-loop

printf("thread NO.%d exit.\n", (int)pthread_self());

}

void ctrip_thread_pool_do_task(struct ctrip_task* t)

{

if (t == NULL)

return;

//TODO: do your work here

printf("task value is [%d]\n", t->value);

//TODO:如果t需要释放,记得在这里释放

}

ok,不知道你有没有看明白呀?

免责声明:内容和图片源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容

预约申请免费试听课

填写下面表单即可预约申请免费试听!怕钱不够?可就业挣钱后再付学费! 怕学不会?助教全程陪读,随时解惑!担心就业?一地学习,可全国推荐就业!

上一篇:C++培训课程笔记--拷贝构造函数中的陷阱
下一篇:C++开发-人脸性别识别总结

Copyright © 2023 Tedu.cn All Rights Reserved 京ICP备08000853号-56 京公网安备 11010802029508号 达内时代科技集团有限公司 版权所有

选择城市和中心
黑龙江省

吉林省

河北省

湖南省

贵州省

云南省

广西省

海南省