搜索
查看: 925|回复: 0

Linux下线程同步机制

[复制链接]

128

主题

5

回帖

433

积分

中级会员

积分
433
发表于 2014-9-13 11:38:15 | 显示全部楼层 |阅读模式

线程的最大特点是资源的共享性,但资源共享中的同步问题是多线程编程的难点。linux下提供了多种方式来处理线程同步,最常用的是互斥锁、条件变量和信号灯和异步信号等。

一、互斥锁

尽管在Posix Thread中同样可以使用IPC的信号量机制来实现互斥锁mutex功能,但显然semphore的功能过于强大了,在Posix Thread中定义了另外一套专门用于线程同步的mutex函数。

1. 创建和销毁

有两种方法创建互斥锁,静态方式和动态方式。

POSIX定义了一个宏PTHREAD_MUTEX_INITIALIZER来静态初始化互斥锁,方法如下: pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIZER; 在LinuxThreads实现中,pthread_mutex_t是一个结构,而PTHREAD_MUTEX_INITIALIZER则是一个结构常量。

动态方式是采用pthread_mutex_init()函数来初始化互斥锁,API定义如下: int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *mutexattr) 其中mutexattr用于指定互斥锁属性(见下),如果为NULL则使用缺省属性。

pthread_mutex_destroy()用于注销一个互斥锁,API定义如下: int pthread_mutex_destroy(pthread_mutex_t *mutex) 销毁一个互斥锁即意味着释放它所占用的资源,且要求锁当前处于开放状态。由于在Linux中,互斥锁并不占用任何资源,因此LinuxThreads中的pthread_mutex_destroy()除了检查锁状态以外(锁定状态则返回EBUSY)没有其他动作。

2. 互斥锁属性

互斥锁的属性在创建锁的时候指定,在LinuxThreads实现中仅有一个锁类型属性,不同的锁类型在试图对一个已经被锁定的互斥锁加锁时表现不同。当前(glibc2.2.3,linuxthreads0.9)有四个值可供选择:

PTHREAD_MUTEX_TIMED_NP,这是缺省值,也就是普通锁。当一个线程加锁以后,其余请求锁的线程将形成一个等待队列,并在解锁后按优先级获得锁。这种锁策略保证了资源分配的公平性。
PTHREAD_MUTEX_RECURSIVE_NP,嵌套锁,允许同一个线程对同一个锁成功获得多次,并通过多次unlock解锁。如果是不同线程请求,则在加锁线程解锁时重新竞争。
PTHREAD_MUTEX_ERRORCHECK_NP,检错锁,如果同一个线程请求同一个锁,则返回EDEADLK,否则与PTHREAD_MUTEX_TIMED_NP类型动作相同。这样就保证当不允许多次加锁时不会出现最简单情况下的死锁。
PTHREAD_MUTEX_ADAPTIVE_NP,适应锁,动作最简单的锁类型,仅等待解锁后重新竞争。

3. 锁操作

锁操作主要包括加锁pthread_mutex_lock()、解锁pthread_mutex_unlock()和测试加锁pthread_mutex_trylock()三个,不论哪种类型的锁,都不可能被两个不同的线程同时得到,而必须等待解锁。对于普通锁和适应锁类型,解锁者可以是同进程内任何线程;而检错锁则必须由加锁者解锁才有效,否则返回EPERM;对于嵌套锁,文档和实现要求必须由加锁者解锁,但实验结果表明并没有这种限制,这个不同目前还没有得到解释。在同一进程中的线程,如果加锁后没有解锁,则任何其他线程都无法再获得锁。

//加锁和解锁

int pthread_mutex_lock(pthread_mutex_t *mutex) int pthread_mutex_unlock(pthread_mutex_t *mutex)int pthread_mutex_trylock(pthread_mutex_t *mutex)

pthread_mutex_trylock()语义与pthread_mutex_lock()类似,不同的是在锁已经被占据时返回EBUSY而不是挂起等待。

4. 其他

POSIX线程锁机制的Linux实现都不是取消点,因此,延迟取消类型的线程不会因收到取消信号而离开加锁等待。值得注意的是,如果线程在加锁后解锁前被取消,锁将永远保持锁定状态,因此如果在关键区段内有取消点存在,或者设置了异步取消类型,则必须在退出回调函数中解锁。

这个锁机制同时也不是异步信号安全的,也就是说,不应该在信号处理过程中使用互斥锁,否则容易造成死锁。

#include <cstdio>#include <cstdlib>#include <unistd.h>#include <pthread.h>#include "iostream"using namespace std;pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;int tmp;void* thread(void *arg){    cout << "thread id is " << pthread_self() << endl;    pthread_mutex_lock(&mutex);    tmp = 12;    cout << "Now a is " << tmp << endl;    pthread_mutex_unlock(&mutex);    return NULL;}int main(){    pthread_t id;    cout << "main thread id is " << pthread_self() << endl;    tmp = 3;    cout << "In main func tmp = " << tmp << endl;    if (!pthread_create(&id, NULL, thread, NULL))    {        cout << "Create thread success!" << endl;    }    else    {        cout << "Create thread failed!" << endl;    }    pthread_join(id, NULL);    pthread_mutex_destroy(&mutex);    return 0;}

二、条件变量

条件变量是利用线程间共享的全局变量进行同步的一种机制,主要包括两个动作:一个线程等待"条件变量的条件成立"而挂起;另一个线程使"条件成立"(给出条件成立信号)。为了防止竞争,条件变量的使用总是和一个互斥锁结合在一起。

1. 创建和注销

条件变量和互斥锁一样,都有静态动态两种创建方式,静态方式使用PTHREAD_COND_INITIALIZER常量,如下:

pthread_cond_t cond=PTHREAD_COND_INITIALIZER

动态方式调用pthread_cond_init()函数,API定义如下:

int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *cond_attr)  

尽管POSIX标准中为条件变量定义了属性,但在LinuxThreads中没有实现,因此cond_attr值通常为NULL,且被忽略。

注销一个条件变量需要调用pthread_cond_destroy(),只有在没有线程在该条件变量上等待的时候才能注销这个条件变量,否则返回EBUSY。因为Linux实现的条件变量没有分配什么资源,所以注销动作只包括检查是否有等待线程。API定义如下:
int pthread_cond_destroy(pthread_cond_t *cond)

2. 等待和激发

int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime)

等待条件有两种方式:无条件等待pthread_cond_wait()和计时等待pthread_cond_timedwait(),其中计时等待方式如果在给定时刻前条件没有满足,则返回ETIMEOUT,结束等待,其中abstime以与time()系统调用相同意义的绝对时间形式出现,0表示格林尼治时间1970年1月1日0时0分0秒。

无论哪种等待方式,都必须和一个互斥锁配合,以防止多个线程同时请求pthread_cond_wait()(或pthread_cond_timedwait(),下同)的竞争条件(Race Condition)。mutex互斥锁必须是普通锁(PTHREAD_MUTEX_TIMED_NP)或者适应锁(PTHREAD_MUTEX_ADAPTIVE_NP),且在调用pthread_cond_wait()前必须由本线程加锁(pthread_mutex_lock()),而在更新条件等待队列以前,mutex保持锁定状态,并在线程挂起进入等待前解锁。在条件满足从而离开pthread_cond_wait()之前,mutex将被重新加锁,以与进入pthread_cond_wait()前的加锁动作对应。

激发条件有两种形式,pthread_cond_signal()激活一个等待该条件的线程,存在多个等待线程时按入队顺序激活其中一个;而pthread_cond_broadcast()则激活所有等待线程。

3. 其他

pthread_cond_wait()和pthread_cond_timedwait()都被实现为取消点,因此,在该处等待的线程将立即重新运行,在重新锁定mutex后离开pthread_cond_wait(),然后执行取消动作。也就是说如果pthread_cond_wait()被取消,mutex是保持锁定状态的,因而需要定义退出回调函数来为其解锁。

以下示例集中演示了互斥锁和条件变量的结合使用,以及取消对于条件等待动作的影响。在例子中,有两个线程被启动,并等待同一个条件变量,如果不使用退出回调函数(见范例中的注释部分),则tid2将在pthread_mutex_lock()处永久等待。如果使用回调函数,则tid2的条件等待及主线程的条件激发都能正常工作。

#include <stdio.h>#include <pthread.h>#include <unistd.h>pthread_mutex_t mutex;pthread_cond_t  cond;void * child1(void *arg){        pthread_cleanup_push(pthread_mutex_unlock,&mutex);  /* comment 1 */        while(1){                printf("thread 1 get running \n");        printf("thread 1 pthread_mutex_lock returns %d\n",pthread_mutex_lock(&mutex));        pthread_cond_wait(&cond,&mutex);                    printf("thread 1 condition applied\n");        pthread_mutex_unlock(&mutex);                    sleep(5);    }        pthread_cleanup_pop(0);     /* comment 2 */}void *child2(void *arg){        while(1){                sleep(3);               /* comment 3 */                printf("thread 2 get running.\n");        printf("thread 2 pthread_mutex_lock returns %d\n",        pthread_mutex_lock(&mutex));        pthread_cond_wait(&cond,&mutex);        printf("thread 2 condition applied\n");        pthread_mutex_unlock(&mutex);        sleep(1);        }}int main(void){        int tid1,tid2;        printf("hello, condition variable test\n");        pthread_mutex_init(&mutex,NULL);        pthread_cond_init(&cond,NULL);        pthread_create(&tid1,NULL,child1,NULL);        pthread_create(&tid2,NULL,child2,NULL);        do{        sleep(2);                   /* comment 4 */                pthread_cancel(tid1);       /* comment 5 */                sleep(2);                   /* comment 6 */        pthread_cond_signal(&cond);    }while(1);         sleep(100);        pthread_exit(0);}         #include <stdio.h>        #include <pthread.h>        #include "stdlib.h"        #include "unistd.h"        pthread_mutex_t mutex;        pthread_cond_t cond;        void hander(void *arg)        {            free(arg);            (void)pthread_mutex_unlock(&mutex);        }        void *thread1(void *arg)        {            pthread_cleanup_push(hander, &mutex);            while(1)            {                printf("thread1 is running\n");                pthread_mutex_lock(&mutex);                pthread_cond_wait(&cond, &mutex);                printf("thread1 applied the condition\n");                pthread_mutex_unlock(&mutex);                sleep(4);            }            pthread_cleanup_pop(0);        }        void *thread2(void *arg)        {            while(1)            {                printf("thread2 is running\n");                pthread_mutex_lock(&mutex);                pthread_cond_wait(&cond, &mutex);                printf("thread2 applied the condition\n");                pthread_mutex_unlock(&mutex);                sleep(1);            }        }        int main()        {            pthread_t thid1,thid2;            printf("condition variable study!\n");            pthread_mutex_init(&mutex, NULL);            pthread_cond_init(&cond, NULL);            pthread_create(&thid1, NULL, thread1, NULL);            pthread_create(&thid2, NULL, thread2, NULL);            sleep(1);            do            {                pthread_cond_signal(&cond);            }while(1);            sleep(20);            pthread_exit(0);            return 0;        }    #include <pthread.h>    #include <unistd.h>    #include "stdio.h"    #include "stdlib.h"    static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;    static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;    struct node    {        int n_number;        struct node *n_next;    }*head = NULL;    static void cleanup_handler(void *arg)    {        printf("Cleanup handler of second thread./n");        free(arg);        (void)pthread_mutex_unlock(&mtx);    }    static void *thread_func(void *arg)    {        struct node *p = NULL;        pthread_cleanup_push(cleanup_handler, p);        while (1)        {            //这个mutex主要是用来保证pthread_cond_wait的并发性            pthread_mutex_lock(&mtx);            while (head == NULL)            {                //这个while要特别说明一下,单个pthread_cond_wait功能很完善,为何                //这里要有一个while (head == NULL)呢?因为pthread_cond_wait里的线                //程可能会被意外唤醒,如果这个时候head != NULL,则不是我们想要的情况。                //这个时候,应该让线程继续进入pthread_cond_wait                // pthread_cond_wait会先解除之前的pthread_mutex_lock锁定的mtx,                //然后阻塞在等待对列里休眠,直到再次被唤醒(大多数情况下是等待的条件成立                //而被唤醒,唤醒后,该进程会先锁定先pthread_mutex_lock(&mtx);,再读取资源                //用这个流程是比较清楚的                pthread_cond_wait(&cond, &mtx);                p = head;                head = head->n_next;                printf("Got %d from front of queue/n", p->n_number);                free(p);            }            pthread_mutex_unlock(&mtx); //临界区数据操作完毕,释放互斥锁        }        pthread_cleanup_pop(0);        return 0;    }    int main(void)    {        pthread_t tid;        int i;        struct node *p;        //子线程会一直等待资源,类似生产者和消费者,但是这里的消费者可以是多个消费者,而        //不仅仅支持普通的单个消费者,这个模型虽然简单,但是很强大        pthread_create(&tid, NULL, thread_func, NULL);        sleep(1);        for (i = 0; i < 10; i++)        {            p = (struct node*)malloc(sizeof(struct node));            p->n_number = i;            pthread_mutex_lock(&mtx); //需要操作head这个临界资源,先加锁,            p->n_next = head;            head = p;            pthread_cond_signal(&cond);            pthread_mutex_unlock(&mtx); //解锁            sleep(1);        }        printf("thread 1 wanna end the line.So cancel thread 2./n");        //关于pthread_cancel,有一点额外的说明,它是从外部终止子线程,子线程会在最近的取消点,退出        //线程,而在我们的代码里,最近的取消点肯定就是pthread_cond_wait()了。        pthread_cancel(tid);        pthread_join(tid, NULL);        printf("All done -- exiting/n");        return 0;    }

如果不做注释5的pthread_cancel()动作,即使没有那些sleep()延时操作,child1和child2都能正常工作。注释3和注释4的延迟使得child1有时间完成取消动作,从而使child2能在child1退出之后进入请求锁操作。如果没有注释1和注释2的回调函数定义,系统将挂起在child2请求锁的地方;而如果同时也不做注释3和注释4的延时,child2能在child1完成取消动作以前得到控制,从而顺利执行申请锁的操作,但却可能挂起在pthread_cond_wait()中,因为其中也有申请mutex的操作。child1函数给出的是标准的条件变量的使用方式:回调函数保护,等待条件前锁定,pthread_cond_wait()返回后解锁。

条件变量机制不是异步信号安全的,也就是说,在信号处理函数中调用pthread_cond_signal()或者pthread_cond_broadcast()很可能引起死锁。

三、信号灯

信号灯与互斥锁和条件变量的主要不同在于"灯"的概念,灯亮则意味着资源可用,灯灭则意味着不可用。如果说后两中同步方式侧重于"等待"操作,即资源不可用的话,信号灯机制则侧重于点灯,即告知资源可用;没有等待线程的解锁或激发条件都是没有意义的,而没有等待灯亮的线程的点灯操作则有效,且能保持灯亮状态。当然,这样的操作原语也意味着更多的开销。

信号灯的应用除了灯亮/灯灭这种二元灯以外,也可以采用大于1的灯数,以表示资源数大于1,这时可以称之为多元灯。

1. 创建和注销

POSIX信号灯标准定义了有名信号灯和无名信号灯两种,但LinuxThreads的实现仅有无名灯,同时有名灯除了总是可用于多进程之间以外,在使用上与无名灯并没有很大的区别,因此下面仅就无名灯进行讨论。

int sem_init(sem_t *sem, int pshared, unsigned int value)这是创建信号灯的API,其中value为信号灯的初值,pshared表示是否为多进程共享而不仅仅是用于一个进程。LinuxThreads没有实现多进程共享信号灯,因此所有非0值的pshared输入都将使sem_init()返回-1,且置errno为ENOSYS。初始化好的信号灯由sem变量表征,用于以下点灯、灭灯操作。int sem_destroy(sem_t * sem) 被注销的信号灯sem要求已没有线程在等待该信号灯,否则返回-1,且置errno为EBUSY。除此之外,LinuxThreads的信号灯注销函数不做其他动作。

2. 点灯和灭灯

int sem_post(sem_t * sem)

点灯操作将信号灯值原子地加1,表示增加一个可访问的资源。

int sem_wait(sem_t * sem)int sem_trywait(sem_t * sem)

sem_wait()为等待灯亮操作,等待灯亮(信号灯值大于0),然后将信号灯原子地减1,并返回。sem_trywait()为sem_wait()的非阻塞版,如果信号灯计数大于0,则原子地减1并返回0,否则立即返回-1,errno置为EAGAIN。

3. 获取灯值

int sem_getvalue(sem_t * sem, int * sval)

读取sem中的灯计数,存于*sval中,并返回0。

4. 其他

sem_wait()被实现为取消点,而且在支持原子"比较且交换"指令的体系结构上,sem_post()是唯一能用于异步信号处理函数的POSIX异步信号安全的API。

    #include <stdlib.h>    #include <stdio.h>    #include <unistd.h>    #include <pthread.h>    #include <semaphore.h>    #include <errno.h>    #define return_if_fail(p) if((p) == 0){printf ("[%s]:func error!/n", __func__);return;}    typedef struct _PrivInfo    {        sem_t s1;        sem_t s2;        time_t end_time;    }PrivInfo;    static void info_init (PrivInfo* thiz);    static void info_destroy (PrivInfo* thiz);    static void* pthread_func_1 (PrivInfo* thiz);    static void* pthread_func_2 (PrivInfo* thiz);    int main (int argc, char** argv)    {        pthread_t pt_1 = 0;        pthread_t pt_2 = 0;        int ret = 0;        PrivInfo* thiz = NULL;        thiz = (PrivInfo* )malloc (sizeof (PrivInfo));        if (thiz == NULL)        {            printf ("[%s]: Failed to malloc priv./n");            return -1;        }        info_init (thiz);        ret = pthread_create (&pt_1, NULL, (void*)pthread_func_1, thiz);        if (ret != 0)        {            perror ("pthread_1_create:");        }        ret = pthread_create (&pt_2, NULL, (void*)pthread_func_2, thiz);        if (ret != 0)        {            perror ("pthread_2_create:");        }        pthread_join (pt_1, NULL);        pthread_join (pt_2, NULL);        info_destroy (thiz);        return 0;    }    static void info_init (PrivInfo* thiz)    {        return_if_fail (thiz != NULL);        thiz->end_time = time(NULL) + 10;        sem_init (&thiz->s1, 0, 1);        sem_init (&thiz->s2, 0, 0);        return;    }    static void info_destroy (PrivInfo* thiz)    {        return_if_fail (thiz != NULL);        sem_destroy (&thiz->s1);        sem_destroy (&thiz->s2);        free (thiz);        thiz = NULL;        return;    }    static void* pthread_func_1 (PrivInfo* thiz)    {        return_if_fail(thiz != NULL);        while (time(NULL) < thiz->end_time)        {            sem_wait (&thiz->s2);            printf ("pthread1: pthread1 get the lock./n");            sem_post (&thiz->s1);            printf ("pthread1: pthread1 unlock/n");            sleep (1);        }        return;    }    static void* pthread_func_2 (PrivInfo* thiz)    {        return_if_fail (thiz != NULL);        while (time (NULL) < thiz->end_time)        {            sem_wait (&thiz->s1);            printf ("pthread2: pthread2 get the unlock./n");            sem_post (&thiz->s2);            printf ("pthread2: pthread2 unlock./n");            sleep (1);        }        return;    }

四、异步信号

由于LinuxThreads是在核外使用核内轻量级进程实现的线程,所以基于内核的异步信号操作对于线程也是有效的。但同时,由于异步信号总是实际发往某个进程,所以无法实现POSIX标准所要求的"信号到达某个进程,然后再由该进程将信号分发到所有没有阻塞该信号的线程中"原语,而是只能影响到其中一个线程。

POSIX异步信号同时也是一个标准C库提供的功能,主要包括信号集管理(sigemptyset()、sigfillset()、sigaddset()、sigdelset()、sigismember()等)、信号处理函数安装(sigaction())、信号阻塞控制(sigprocmask())、被阻塞信号查询(sigpending())、信号等待(sigsuspend())等,它们与发送信号的kill()等函数配合就能实现进程间异步信号功能。LinuxThreads围绕线程封装了sigaction()何raise(),本节集中讨论LinuxThreads中扩展的异步信号函数,包括pthread_sigmask()、pthread_kill()和sigwait()三个函数。毫无疑问,所有POSIX异步信号函数对于线程都是可用的。

int pthread_sigmask(int how, const sigset_t *newmask, sigset_t *oldmask)
设置线程的信号屏蔽码,语义与sigprocmask()相同,但对不允许屏蔽的Cancel信号和不允许响应的Restart信号进行了保护。被屏蔽的信号保存在信号队列中,可由sigpending()函数取出。

int pthread_kill(pthread_t thread, int signo)
向thread号线程发送signo信号。实现中在通过thread线程号定位到对应进程号以后使用kill()系统调用完成发送。

int sigwait(const sigset_t *set, int *sig)
挂起线程,等待set中指定的信号之一到达,并将到达的信号存入*sig中。POSIX标准建议在调用sigwait()等待信号以前,进程中所有线程都应屏蔽该信号,以保证仅有sigwait()的调用者获得该信号,因此,对于需要等待同步的异步信号,总是应该在创建任何线程以前调用pthread_sigmask()屏蔽该信号的处理。而且,调用sigwait()期间,原来附接在该信号上的信号处理函数不会被调用。

如果在等待期间接收到Cancel信号,则立即退出等待,也就是说sigwait()被实现为取消点。

五、其他同步方式

除了上述讨论的同步方式以外,其他很多进程间通信手段对于LinuxThreads也是可用的,比如基于文件系统的IPC(管道、Unix域Socket等)、消息队列(Sys.V或者Posix的)、System V的信号灯等。只有一点需要注意,LinuxThreads在核内是作为共享存储区、共享文件系统属性、共享信号处理、共享文件描述符的独立进程看待的。

六、条件变量与互斥锁、信号量的区别

1).互斥锁必须总是由给它上锁的线程解锁,信号量的挂出即不必由执行过它的等待操作的同一进程执行。一个线程可以等待某个给定信号灯,而另一个线程可以挂出该信号灯。

2).互斥锁要么锁住,要么被解开(二值状态,类型二值信号量)

3).由于信号量有一个与之关联的状态(它的计数值),信号量挂出操作总是被记住。然而当向一个条件变量发送信号时,如果没有线程等待在该条件变量上,那么该信号将丢失。

4).互斥锁是为了上锁而设计的,条件变量是为了等待而设计的,信号灯即可用于上锁,也可用于等待,因而可能导致更多的开销和更高的复杂性。


您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

 
 
大数据行业交流
大数据行业交流
大数据求职招聘
大数据求职招聘
站长电话:
15010106923
微信联系:
hb-0310
站长邮箱:
ab12-120@163.com
大数据中国微信

QQ   

版权所有: Discuz! © 2001-2013 大数据.

GMT+8, 2024-4-29 11:10 , Processed in 0.103547 second(s), 24 queries .

快速回复 返回顶部 返回列表