Skip to content

Latest commit

 

History

History
265 lines (253 loc) · 14.9 KB

05.锁和信号(下).md

File metadata and controls

265 lines (253 loc) · 14.9 KB

Swoole源码学习记录


##Swoole版本:1.7.4-stable

###二.Mutex互斥锁 接下来是Mutex(互斥锁)。Swoole的Mutex实现是基于pthread_mutex*方法族实现的,Rango用一个swMutex结构体封装了mutex的两个属性,结构体定义如下:

    //互斥锁
    typedef struct _swMutex
    {
        pthread_mutex_t _lock;      // 互斥锁对象
        pthread_mutexattr_t attr;   // 互斥锁的属性
    } swMutex;

创建一个互斥锁的函数声明在swoole.h文件的 536 行,声明如下:

    int swMutex_create(swLock *lock, int use_in_process);

其中lock为锁对象,use_in_process用于标记该锁是否用于进程之间。该函数的具体定义在Mutex.c文件中,其核心代码如下:

    lock->type = SW_MUTEX;
    pthread_mutexattr_init(&lock->object.mutex.attr);
    if (use_in_process == 1)
    {
        pthread_mutexattr_setpshared(&lock->object.mutex.attr, PTHREAD_PROCESS_SHARED);
    }
    if ((ret = pthread_mutex_init(&lock->object.mutex._lock, &lock->object.mutex.attr)) < 0)
    {
        return SW_ERR;
    }

源码解释:设置锁的类型为SW_MUTEX,并调用pthread_mutexattr_init函数初始化mutex的属性结构体。如果该进程用于进程之间,则调用pthread_mutexattr_setpshared函数设置该锁的共享属性为PTHREAD_PROCESS_SHARED(进程间共享)。接着调用pthread_mutex_init函数初始化互斥锁。

这里要重点分析三个粗体的函数。这三个函数都属于pthread_mutex*方法族,用于操作mutex以及其attr属性。

  1. pthread_mutexattr_init方法用于初始化一个pthread_mutexattr_t结构体,并默认设置其pshared属性为PTHREAD_PROCESS_PRIVATE(表示可以在进程内使用该互斥锁)。
  2. pthread_mutexattr_setpshared方法用来设置互斥锁变量的作用域。PTHREAD_PROCESS_SHARED表示该互斥锁可被多个进程共享使用(前提是该互斥锁是在共享内存中创建)。PTHREAD_PROCESS_PRIVATE表示该互斥锁仅能被那些由同一个进程创建的线程处理。
  3. pthread_mutex_init方法以动态方式创建互斥锁,并通过参数attr指定互斥锁属性。如果参数attr为空,则默认创建快速互斥锁。

swMutex的其他操作函数swMutex_lock、swMutex_unlock、swMutex_trylock、free分别对应pthread_mutex_lock、pthread_mutex_unlock、pthread_mutex_trylock、pthread_mutex_destroy,用于加锁、解锁、尝试加锁、销毁swMutex。

###三.RWLock读写锁 读写锁的内容其实与互斥锁相差不大,区别仅在于读写锁有读锁、写锁两个区别,同时底层调用的是pthread_rwlock*系列函数。读写锁swRWLock结构体在swoole.c文件的 398 – 404 行声明,如下:

    //读写锁
    typedef struct _swRWLock
    {
        pthread_rwlock_t _lock;     // 读写锁对象
        pthread_rwlockattr_t attr;  // 读写锁属性
    
    } swRWLock;

创建一个读写锁的函数声明在swoole.h文件的 534 行,声明如下:

    int swRWLock_create(swLock *lock, int use_in_process);

其中lock为锁对象,use_in_process用于标记该锁是否用于进程之间。该函数的具体定义在RWLock.c文件中,其核心代码如下:

    lock->type = SW_RWLOCK;
    if(use_in_process == 1)
    {
        pthread_rwlockattr_setpshared(&lock->object.rwlock.attr, PTHREAD_PROCESS_SHARED);
    }
    if((ret = pthread_rwlock_init(&lock->object.rwlock._lock, &lock->object.rwlock.attr)) < 0)
    {
        return SW_ERR;
    }

源码解释:设置锁类型为SW_RWLOCK,如果该锁用于进程之间,则调用pthread_rwlockattr_setpshared函数设置该锁的共享属性为PTHREAD_PROCESS_SHARED(进程间共享)。接着调用pthread_rwlock_init初始化该读写锁。

这里重点分析两个粗体函数。这两个函数属于pthread_rwlock*方法族,用于操作rwlock以及其attr属性。

  1. pthread_rwlockattr_setpshared用于设置一个pthread_rwlockattr_t的属性,和mutex一样有PTHREAD_PROCESS_SHARED和PTHREAD_PROCESS_PRIVATE两个值。
  2. pthread_rwlock_init用于初始化一个pthread_rwlock_t结构体,创建一个读写锁,并通过attr设置其属性。 (实际上rwlock也有pthread_rwlockattr_init方法,不知道为什么这里没有调用。所有pthread_rwlock*相关的函数请参考http://blog.163.com/software_ark/blog/static/175614594201181665330631/)

剩下的就是通过调用pthread_rwlock的相关操作函数如pthread_rwlock_rdlock、pthread_rwlock_wrlock来创建读锁、写锁以及解锁。大家可以直接看RWLock.c文件中的定义,在此为节约篇幅不再叙述。 ###四.SpinLock自旋锁 首先需要说明一下什么是自旋锁。Spinlock本质上也是一种互斥锁,它和mutex的区别在于Spinlock是通过busy_wait_loop方式来获得锁,不会使线程状态发生切换(用户态->内核态),因此减少了系统调用,执行速度快。但缺点也是有的,Spinlock会一直占用cpu,导致cpu busy飙高,因此要谨慎使用Spinlock。 SpinLock自旋锁的特殊性在于其只有一个pthread_spinlock_t锁对象,没有对应的属性结构体。其swSpinLock结构体在swoole.c文件中 406 – 412 行声明,如下:

    //自旋锁
    #ifdef HAVE_SPINLOCK
    typedef struct _swSpinLock
    {
        pthread_spinlock_t lock_t;  // 自旋锁对象
    } swSpinLock;
    #endif

创建一个自旋锁的函数声明在swoole.h文件的 534 行,声明如下:

    int swSpinLock_create(swLock *object, int spin); 

其中lock为锁对象,spin(为何不继续使用use_in_process……)用于标记该锁是否用于进程之间。该函数的具体定义在SpinLock.c文件中,其核心代码如下:

    lock->type = SW_SPINLOCK;
    if ((ret = pthread_spin_init(&lock->object.spinlock.lock_t, use_in_process)) < 0)
    {
        return -1;
    }

源码解释:设置锁类型为SW_SPINLOCK,调用pthread_spin_init初始化一个Spinlock对象。

  1. pthread_spin_init用于初始化一个pthread_spinlock_t对象,第二个参数用于指定该spinlock是否可被进程间共享。 其他操作均为直接调用pthread_spin*方法族的相关函数,具体内容请看SpinLock.c文件。 ###五.原子锁 首先我说明一下什么是原子操作。所谓原子操作,就是该操作绝不会在执行完毕前被任何其他任务或事件打断,也就说,它的最小的执行单位,不可能有比它更小的执行单位。通常情况下,原子操作都是用于资源计数。 原子锁就是用来保证操作的原子性。Swoole中自定义了一个原子类型atomic_t,其声明在atomic.h文件中的14行,这个类型在不同位数的操作系统里有不同的长度。同时,swoole使用了gcc提供的__sync_*系列的build-in函数来提供加减和逻辑运算的原子操作。这些函数的声明在atomic.h文件的16 – 20行,如下:
    #define sw_atomic_cmp_set(lock, old, set)   __sync_bool_compare_and_swap(lock, old, set)
    #define sw_atomic_fetch_add(value, add)   __sync_fetch_and_add(value, add)
    #define sw_atomic_fetch_sub(value, sub)   __sync_fetch_and_sub(value, sub)
    #define sw_atomic_memory_barrier()        __sync_synchronize()
    #define sw_atomic_cpu_pause()             __asm__ ("pause")

这里依次介绍5个函数的作用:

  1. sw_atomic_cmp_set(lock, old, set) 如果lock的值等于old的值,将set的值写入lock。如果相等并写入成功则返回true
  2. sw_atomic_fetch_add(value, add) 将value的值增加add,返回增加前的值
  3. sw_atomic_fetch_sub(value, sub) 将value的值减去sub,返回减去前的值
  4. sw_atomic_memory_barrier() 发出一个full barrier。保证指令执行的顺序合理
  5. sw_atomic_cpu_pause() 这个函数调用的是__asm__(“pause”),通过一番查询,这个调用能让cpu等待一段时间,这个时间由处理器定义。

Swoole另外声明了一个结构体swAtomicLock来封装原子锁,该结构体在swoole.h文件中 414 – 419 行声明,如下:

    //原子锁Lock-Free
    typedef struct _swAtomicLock
    {
        atomic_t lock_t;
        uint32_t spin;
    } swAtomicLock;

其中lock_t为原子锁对象,spin为一个原子锁可以等待的次数(为2的n次幂)。 原子锁的全部操作函数声明在swoole.h文件的541 – 544行,如下:

    int swAtomicLock_create(swLock *object, int spin);
    sw_inline int swAtomicLock_lock(swLock *lock);
    sw_inline int swAtomicLock_unlock(swLock *lock);
    sw_inline int swAtomicLock_trylock(swLock *lock);

这里着重介绍swAtomicLock_lock方法,其核心代码如下:

    atomic_t *atomic = &lock->object.atomlock.lock_t;
    uint32_t i, n;
    while (1)
    {
        if (*atomic == 0 && sw_atomic_cmp_set(atomic, 0, 1))
        {
            return SW_OK;
        }
        if (SW_CPU_NUM > 1)
        {
            for (n = 1; n < lock->object.atomlock.spin; n <<= 1)
            {
                for (i = 0; i < n; i++)
                {
                    sw_atomic_cpu_pause();
                }

                if (*atomic == 0 && sw_atomic_cmp_set(atomic, 0, 1))
                {
                    return SW_OK;
                }
            }
        }
        swYield();
    }
    return SW_ERR;

源码解释:首先获得atomic_t对象的指针,接着进入一个无限循环。在该循环里,首先判断atomic的值是否为0,并尝试将其赋值为1,如果成功,则直接返回。若前两个判断条件不成立,接着判断CPU数目是否大于1(多核),如果是则进入循环,循环变量为n,每次循环内先执行n次sw_atomic_cpu_pause(),随后再次尝试设置atomic的值为1。每次循环结束后将n的值左移一位(值*2,这就是为何spin的值要是2的幂)。如果CPU的数目为1或者直到内循环结束也没有设置成功atomic,就调用swYield方法交出线程的使用权。 这里就是在不停的尝试申请atomic锁,如果失败就等待一段时间后再次申请,每次失败后等待的时间会加长。 ###六.信号量 Swoole的Semaphore信号量的实现是基于Linux的semget、semop和semctl函数实现的。结构体swSem用于封装信号量相关的属性,其声明在swoole.h的421-427行,如下:

    //信号量
    typedef struct _swSem
    {
        key_t key;  // 关键字,通常由ftok()返回
        int semid;  // 信号量id
        int lock_num;   // 
    } swSem;

创建一个swSem的函数声明在swoole.h文件的535行,如下:

    int swSem_create(swLock *lock, key_t key, int n);

其中lock为信号量对象,key为信号量关键字,n为该信号量拥有的资源数。 该函数具体定义在Semaphore.c文件内,其核心代码如下:

    lock->type = SW_SEM;
    if ((ret = semget(key, n, IPC_CREAT | 0666)) < 0)
    {
        return SW_ERR;
    }
    lock->object.sem.semid = ret;
    lock->object.sem.lock_num = 0;

源码解释:设置锁类型为SW_SEM,并调用semget函数创造一个信号量,并返回信号的id。设置信号量id并设置lock_num为0。 swSem的操作函数共有三个:swSem_lock,swSem_unlock,swSem_free。分别对应加锁、解锁和销毁。 swSem_lock的核心代码如下:

    struct sembuf sem;
    sem.sem_flg = SEM_UNDO;
    sem.sem_num = lock->object.sem.lock_num;
    // sem.sem_op = 1; 此行为原始代码,经作者确认为bug
    sem.sem_op = -1;
    return semop(lock->object.sem.semid, &sem, 1);

源码解释:创建一个struct sembuf 结构体,设置sem_flg为SEM_UNFO(程序结束时(不论正常或不正常),保证信号值会被重设为semop()调用前的值),指定操作的信号量为lock_num,设置sem_op为-1(请求资源),最后调用semop函数操作信号量。 swSem_unlock的核心代码如下:

    struct sembuf sem;
    sem.sem_flg = SEM_UNDO;
    sem.sem_num = lock->object.sem.lock_num;
    // sem.sem_op = -1; 此行为原始代码,经作者确认为bug
    sem.sem_op = 1;
    return semop(lock->object.sem.semid, &sem, 1);

源码解释:创建一个struct sembuf 结构体,设置sem_flg为SEM_UNFO(程序结束时(不论正常或不正常),保证信号值会被重设为semop()调用前的值),指定操作的信号量为lock_num,设置sem_op为1(释放资源),最后调用semop函数操作信号量。 swSem_free函数就是调用semctl方法释放信号量。 ###七.Cond条件变量 条件变量的用处是使线程睡眠等待某种条件出现后唤醒线程,是利用线程间共享的全局变量进行同步的一种机制,主要包括两个动作:一个是线程等待“条件成立”而挂起,另一个是线程使“条件成立”并发出信号。为了防止竞争,条件变量通常与一个互斥锁结合在一起。 Swoole中,条件变量的实现使用了pthread_cond*方法族,其封装结构体swCond声明在swoole.c文件的451 - 461行,如下:

    //Cond
    typedef struct _swCond
    {
        swLock lock;                // 互斥锁
        pthread_cond_t cond;        // 条件变量对象
    
        int (*wait)(struct _swCond *object);                // 四个操作函数
        int (*timewait)(struct _swCond *object,long,long);
        int (*notify)(struct _swCond *object);
        int (*broadcast)(struct _swCond *object);
    } swCond;

swCond的相关操作函数共六个,声明在swoole.c文件中的546 – 551行,如下:

    int swCond_create(swCond *cond);
    int swCond_notify(swCond *cond);
    int swCond_broadcast(swCond *cond);
    int swCond_timewait(swCond *cond, long sec, long nsec);
    int swCond_wait(swCond *cond);
    void swCond_free(swCond *cond);

这六个函数均在Cond.c文件中被定义。

  1. swCond_create核心代码:
    if (pthread_cond_init(&cond->cond, NULL) < 0)
    {
        swWarn("pthread_cond_init fail. Error: %s [%d]", strerror(errno), errno);
        return SW_ERR;
    }
    if (swMutex_create(&cond->lock, 0) < 0)
    {
        return SW_ERR;
    }

源码解释:调用pthread_cond_init创建并初始化一个条件变量,并创建对应的互斥锁。 2. swCond_notify方法调用pthread_cond_signal方法,向另一个正处于阻塞等待状态的线程发信号,使其脱离阻塞状态。需要注意的是,如果有多个线程正在等待,则根据优先级的高低决定由哪个线程收到信号;若优先级相同,则优先让等待时间最长的线程执行。 3. swCond_broadcast方法调用pthread_cond_broadcast方法,向所有正处于阻塞等待状态的线程发出信号使其脱离阻塞状态。 4. swCond_timewait方法调用pthread_cond_timedwait方法,计时等待其他线程发出信号,等待时间由参数long sec,long nsec指定。 5. swCond_wait方法调用pthread_cond_wait方法,等待其他线程发出信号。 6. swCond_free方法调用pthread_cond_destroy方法销毁信号变量,并销毁互斥锁。

至此,swoole全部的锁和信号已分析完成。