Linux 内核揭密

 主页   资讯   文章   代码   电子书 

Synchronization primitives in the Linux kernel. Part 4.

Introduction

This is the fourth part of the chapter which describes synchronization primitives in the Linux kernel and in the previous parts we finished to consider different types spinlocks and semaphore synchronization primitives. We will continue to learn synchronization primitives in this part and consider yet another one which is called - mutex which is stands for MUTual EXclusion.

As in all previous parts of this book, we will try to consider this synchronization primitive from the theoretical side and only than we will consider API provided by the Linux kernel to manipulate with mutexes.

So, let's start.

Concept of mutex

We already familiar with the semaphore synchronization primitive from the previous part. It represented by the:

struct semaphore {
    raw_spinlock_t      lock;
    unsigned int        count;
    struct list_head    wait_list;
};

structure which holds information about state of a lock and list of a lock waiters. Depends on the value of the count field, a semaphore can provide access to a resource of more than one wishing of this resource. The mutex concept is very similar to a semaphore concept. But it has some differences. The main difference between semaphore and mutex synchronization primitive is that mutex has more strict semantic. Unlike a semaphore, only one process may hold mutex at one time and only the owner of a mutex may release or unlock it. Additional difference in implementation of lock API. The semaphore synchronization primitive forces rescheduling of processes which are in waiters list. The implementation of mutex lock API allows to avoid this situation and as a result expensive context switches.

The mutex synchronization primitive represented by the following:

struct mutex {
        atomic_t                count;
        spinlock_t              wait_lock;
        struct list_head        wait_list;
#if defined(CONFIG_DEBUG_MUTEXES) || defined(CONFIG_MUTEX_SPIN_ON_OWNER)
        struct task_struct      *owner;
#endif
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
        struct optimistic_spin_queue osq;
#endif
#ifdef CONFIG_DEBUG_MUTEXES
        void                    *magic;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
        struct lockdep_map      dep_map;
#endif
};

structure in the Linux kernel. This structure is defined in the include/linux/mutex.h header file and contains similar to the semaphore structure set of fields. The first field of the mutex structure is - count. Value of this field represents state of a mutex. In a case when the value of the count field is 1, a mutex is in unlocked state. When the value of the count field is zero, a mutex is in the locked state. Additionally value of the count field may be negative. In this case a mutex is in the locked state and has possible waiters.

The next two fields of the mutex structure - wait_lock and wait_list are spinlock for the protection of a wait queue and list of waiters which represents this wait queue for a certain lock. As you may notice, the similarity of the mutex and semaphore structures ends. Remaining fields of the mutex structure, as we may see depends on different configuration options of the Linux kernel.

The first field - owner represents process which acquired a lock. As we may see, existence of this field in the mutex structure depends on the CONFIG_DEBUG_MUTEXES or CONFIG_MUTEX_SPIN_ON_OWNER kernel configuration options. Main point of this field and the next osq fields is support of optimistic spinning which we will see later. The last two fields - magic and dep_map are used only in debugging mode. The magic field is to storing a mutex related information for debugging and the second field - lockdep_map is for lock validator of the Linux kernel.

Now, after we have considered the mutex structure, we may consider how this synchronization primitive works in the Linux kernel. As you may guess, a process which wants to acquire a lock, must to decrease value of the mutex->count if possible. And if a process wants to release a lock, it must to increase the same value. That's true. But as you may also guess, it is not so simple in the Linux kernel.

Actually, when a process try to acquire a mutex, there three possible paths:

  • fastpath;
  • midpath;
  • slowpath.

which may be taken, depending on the current state of the mutex. The first path or fastpath is the fastest as you may understand from its name. Everything is easy in this case. Nobody acquired a mutex, so the value of the count field of the mutex structure may be directly decremented. In a case of unlocking of a mutex, the algorithm is the same. A process just increments the value of the count field of the mutex structure. Of course, all of these operations must be atomic.

Yes, this looks pretty easy. But what happens if a process wants to acquire a mutex which is already acquired by other process? In this case, the control will be transferred to the second path - midpath. The midpath or optimistic spinning tries to spin with already familiar for us MCS lock while the lock owner is running. This path will be executed only if there are no other processes ready to run that have higher priority. This path is called optimistic because the waiting task will not be sleep and rescheduled. This allows to avoid expensive context switch.

In the last case, when the fastpath and midpath may not be executed, the last path - slowpath will be executed. This path acts like a semaphore lock. If the lock is unable to be acquired by a process, this process will be added to wait queue which is represented by the following:

struct mutex_waiter {
        struct list_head        list;
        struct task_struct      *task;
#ifdef CONFIG_DEBUG_MUTEXES
        void                    *magic;
#endif
};

structure from the include/linux/mutex.h header file and will be sleep. Before we will consider API which is provided by the Linux kernel for manipulation with mutexes, let's consider the mutex_waiter structure. If you have read the previous part of this chapter, you may notice that the mutex_waiter structure is similar to the semaphore_waiter structure from the kernel/locking/semaphore.c source code file:

struct semaphore_waiter {
        struct list_head list;
        struct task_struct *task;
        bool up;
};

It also contains list and task fields which are represent entry of the mutex wait queue. The one difference here that the mutex_waiter does not contains up field, but contains the magic field which depends on the CONFIG_DEBUG_MUTEXES kernel configuration option and used to store a mutex related information for debugging purpose.

Now we know what is it mutex and how it is represented the Linux kernel. In this case, we may go ahead and start to look at the API which the Linux kernel provides for manipulation of mutexes.

Mutex API

Ok, in the previous paragraph we knew what is it mutex synchronization primitive and saw the mutex structure which represents mutex in the Linux kernel. Now it's time to consider API for manipulation of mutexes. Description of the mutex API is located in the include/linux/mutex.h header file. As always, before we will consider how to acquire and release a mutex, we need to know how to initialize it.

There are two approaches to initialize a mutex. The first is to do it statically. For this purpose the Linux kernel provides following:

#define DEFINE_MUTEX(mutexname) \
        struct mutex mutexname = __MUTEX_INITIALIZER(mutexname)

macro. Let's consider implementation of this macro. As we may see, the DEFINE_MUTEX macro takes name for the mutex and expands to the definition of the new mutex structure. Additionally new mutex structure get initialized with the __MUTEX_INITIALIZER macro. Let's look at the implementation of the __MUTEX_INITIALIZER:

#define __MUTEX_INITIALIZER(lockname)         \
{                                                             \
       .count = ATOMIC_INIT(1),                               \
       .wait_lock = __SPIN_LOCK_UNLOCKED(lockname.wait_lock), \
       .wait_list = LIST_HEAD_INIT(lockname.wait_list)        \
}

This macro is defined in the same header file and as we may understand it initializes fields of the mutex structure the initial values. The count field get initialized with the 1 which represents unlocked state of a mutex. The wait_lock spinlock get initialized to the unlocked state and the last field wait_list to empty doubly linked list.

The second approach allows us to initialize a mutex dynamically. To do this we need to call the __mutex_init function from the kernel/locking/mutex.c source code file. Actually, the __mutex_init function rarely called directly. Instead of the __mutex_init, the:

# define mutex_init(mutex)                \
do {                                                    \
        static struct lock_class_key __key;             \
                                                        \
        __mutex_init((mutex), #mutex, &__key);          \
} while (0)

macro is used. We may see that the mutex_init macro just defines the lock_class_key and call the __mutex_init function. Let's look at the implementation of this function:

void
__mutex_init(struct mutex *lock, const char *name, struct lock_class_key *key)
{
        atomic_set(&lock->count, 1);
        spin_lock_init(&lock->wait_lock);
        INIT_LIST_HEAD(&lock->wait_list);
        mutex_clear_owner(lock);
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
        osq_lock_init(&lock->osq);
#endif
        debug_mutex_init(lock, name, key);
}

As we may see the __mutex_init function takes three arguments:

  • lock - a mutex itself;
  • name - name of mutex for debugging purpose;
  • key - key for lock validator.

At the beginning of the __mutex_init function, we may see initialization of the mutex state. We set it to unlocked state with the atomic_set function which atomically set the give variable to the given value. After this we may see initialization of the spinlock to the unlocked state which will protect wait queue of the mutex and initialization of the wait queue of the mutex. After this we clear owner of the lock and initialize optimistic queue by the call of the osq_lock_init function from the include/linux/osq_lock.h header file. This function just sets the tail of the optimistic queue to the unlocked state:

static inline bool osq_is_locked(struct optimistic_spin_queue *lock)
{
        return atomic_read(&lock->tail) != OSQ_UNLOCKED_VAL;
}

In the end of the __mutex_init function we may see the call of the debug_mutex_init function, but as I already wrote in previous parts of this chapter, we will not consider debugging related stuff in this chapter.

After the mutex structure is initialized, we may go ahead and will look at the lock and unlock API of mutex synchronization primitive. Implementation of mutex_lock and mutex_unlock functions located in the kernel/locking/mutex.c source code file. First of all let's start from the implementation of the mutex_lock. It looks:

void __sched mutex_lock(struct mutex *lock)
{
        might_sleep();
        __mutex_fastpath_lock(&lock->count, __mutex_lock_slowpath);
        mutex_set_owner(lock);
}

We may see the call of the might_sleep macro from the include/linux/kernel.h header file at the beginning of the mutex_lock function. Implementation of this macro depends on the CONFIG_DEBUG_ATOMIC_SLEEP kernel configuration option and if this option is enabled, this macro just prints a stack trace if it was executed in atomic context. This macro is helper for debugging purposes. In other way this macro does nothing.

After the might_sleep macro, we may see the call of the __mutex_fastpath_lock function. This function is architecture-specific and as we consider x86_64 architecture in this book, the implementation of the __mutex_fastpath_lock is located in the arch/x86/include/asm/mutex_64.h header file. As we may understand from the name of the __mutex_fastpath_lock function, this function will try to acquire lock in a fast path or in other words this function will try to decrement the value of the count of the given mutex.

Implementation of the __mutex_fastpath_lock function consists from two parts. The first part is inline assembly statement. Let's look at it:

asm_volatile_goto(LOCK_PREFIX "   decl %0\n"
                              "   jns %l[exit]\n"
                              : : "m" (v->counter)
                              : "memory", "cc"
                              : exit);

First of all, let's pay attention to the asm_volatile_goto. This macro is defined in the include/linux/compiler-gcc.h header file and just expands to the two inline assembly statements:

#define asm_volatile_goto(x...) do { asm goto(x); asm (""); } while (0)

The first assembly statement contains goto specificator and the second empty inline assembly statement is barrier. Now let's return to the our inline assembly statement. As we may see it starts from the definition of the LOCK_PREFIX macro which just expands to the lock instruction:

#define LOCK_PREFIX LOCK_PREFIX_HERE "\n\tlock; "

As we already know from the previous parts, this instruction allows to execute prefixed instruction atomically. So, at the first step in the our assembly statement we try decrement value of the given mutex->counter. At the next step the jns instruction will execute jump at the exit label if the value of the decremented mutex->counter is not negative. The exit label is the second part of the __mutex_fastpath_lock function and it just points to the exit from this function:

exit:
        return;

For this moment he implementation of the __mutex_fastpath_lock function looks pretty easy. But the value of the mutex->counter may be negative after increment. In this case the:

fail_fn(v);

will be called after our inline assembly statement. The fail_fn is the second parameter of the __mutex_fastpath_lock function and represents pointer to function which represents midpath/slowpath paths to acquire the given lock. In our case the fail_fn is the __mutex_lock_slowpath function. Before we will look at the implementation of the __mutex_lock_slowpath function, let's finish with the implementation of the mutex_lock function. In the simplest way, the lock will be acquired successfully by a process and the __mutex_fastpath_lock will be finished. In this case, we just call the

mutex_set_owner(lock);

in the end of the mutex_lock. The mutex_set_owner function is defined in the kernel/locking/mutex.h header file and just sets owner of a lock to the current process:

static inline void mutex_set_owner(struct mutex *lock)
{
        lock->owner = current;
}

In other way, let's consider situation when a process which wants to acquire a lock is unable to do it, because another process already acquired the same lock. We already know that the __mutex_lock_slowpath function will be called in this case. Let's consider implementation of this function. This function is defined in the kernel/locking/mutex.c source code file and starts from the obtaining of the proper mutex by the mutex state given from the __mutex_fastpath_lock with the container_of macro:

__visible void __sched
__mutex_lock_slowpath(atomic_t *lock_count)
{
        struct mutex *lock = container_of(lock_count, struct mutex, count);

        __mutex_lock_common(lock, TASK_UNINTERRUPTIBLE, 0,
                            NULL, _RET_IP_, NULL, 0);
}

and call the __mutex_lock_common function with the obtained mutex. The __mutex_lock_common function starts from preemtion disabling until rescheduling:

preempt_disable();

After this comes the stage of optimistic spinning. As we already know this stage depends on the CONFIG_MUTEX_SPIN_ON_OWNER kernel configuration option. If this option is disabled, we skip this stage and move at the last path - slowpath of a mutex acquisition:

if (mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx)) {
        preempt_enable();
        return 0;
}

First of all the mutex_optimistic_spin function check that we don't need to reschedule or in other words there are no other tasks ready to run that have higher priority. If this check was successful we need to update MCS lock wait queue with the current spin. In this way only one spinner can complete for the mutex at one time:

osq_lock(&lock->osq)

At the next step we start to spin in the next loop:

while (true) {
    owner = READ_ONCE(lock->owner);

    if (owner && !mutex_spin_on_owner(lock, owner))
        break;

    if (mutex_try_to_acquire(lock)) {
        lock_acquired(&lock->dep_map, ip);

        mutex_set_owner(lock);
        osq_unlock(&lock->osq);
        return true;
    }
}

and try to acquire a lock. First of all we try to take current owner and if the owner exists (it may not exists in a case when a process already released a mutex) and we wait for it in the mutex_spin_on_owner function before the owner will release a lock. If new task with higher priority have appeared during wait of the lock owner, we break the loop and go to sleep. In other case, the process already may release a lock, so we try to acquire a lock with the mutex_try_to_acquired. If this operation finished successfully, we set new owner for the given mutex, removes ourself from the MCS wait queue and exit from the mutex_optimistic_spin function. At this state a lock will be acquired by a process and we enable preemtion and exit from the __mutex_lock_common function:

if (mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx)) {
    preempt_enable();
    return 0;
}

That's all for this case.

In other case all may not be so successful. For example new task may occur during we spinning in the loop from the mutex_optimistic_spin or even we may not get to this loop from the mutex_optimistic_spin in a case when there were task(s) with higher priority before this loop. Or finally the CONFIG_MUTEX_SPIN_ON_OWNER kernel configuration option disabled. In this case the mutex_optimistic_spin will do nothing:

#ifndef CONFIG_MUTEX_SPIN_ON_OWNER
static bool mutex_optimistic_spin(struct mutex *lock,
                                  struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx)
{
    return false;
}
#endif

In all of these cases, the __mutex_lock_common function will acct like a semaphore. We try to acquire a lock again because the owner of a lock might already release a lock before this time:

if (!mutex_is_locked(lock) &&
   (atomic_xchg_acquire(&lock->count, 0) == 1))
      goto skip_wait;

In a failure case the process which wants to acquire a lock will be added to the waiters list

list_add_tail(&waiter.list, &lock->wait_list);
waiter.task = task;

In a successful case we update the owner of a lock, enable preemption and exit from the __mutex_lock_common function:

skip_wait:
        mutex_set_owner(lock);
        preempt_enable();
        return 0; 

In this case a lock will be acquired. If can't acquire a lock for now, we enter into the following loop:

for (;;) {

    if (atomic_read(&lock->count) >= 0 && (atomic_xchg_acquire(&lock->count, -1) == 1))
        break;

    if (unlikely(signal_pending_state(state, task))) {
        ret = -EINTR;
        goto err;
    } 

    __set_task_state(task, state);

     schedule_preempt_disabled();
}

where try to acquire a lock again and exit if this operation was successful. Yes, we try to acquire a lock again right after unsuccessful try before the loop. We need to do it to make sure that we get a wakeup once a lock will be unlocked. Besides this, it allows us to acquire a lock after sleep. In other case we check the current process for pending signals and exit if the process was interrupted by a signal during wait for a lock acquisition. In the end of loop we didn't acquire a lock, so we set the task state for TASK_UNINTERRUPTIBLE and go to sleep with call of the schedule_preempt_disabled function.

That's all. We have considered all three possible paths through which a process may pass when it will wan to acquire a lock. Now let's consider how mutex_unlock is implemented. When the mutex_unlock will be called by a process which wants to release a lock, the __mutex_fastpath_unlock will be called from the arch/x86/include/asm/mutex_64.h header file:

void __sched mutex_unlock(struct mutex *lock)
{
    __mutex_fastpath_unlock(&lock->count, __mutex_unlock_slowpath);
}

Implementation of the __mutex_fastpath_unlock function is very similar to the implementation of the __mutex_fastpath_lock function:

static inline void __mutex_fastpath_unlock(atomic_t *v,
                                           void (*fail_fn)(atomic_t *))
{
       asm_volatile_goto(LOCK_PREFIX "   incl %0\n"
                         "   jg %l[exit]\n"
                         : : "m" (v->counter)
                         : "memory", "cc"
                         : exit);
       fail_fn(v);
exit:
       return;
}

Actually, there is only one difference. We increment value if the mutex->count. So it will represent unlocked state after this operation. As mutex released, but we have something in the wait queue we need to update it. In this case the fail_fn function will be called which is __mutex_unlock_slowpath. The __mutex_unlock_slowpath function just gets the correct mutex instance by the given mutex->count and calls the __mutex_unlock_common_slowpath function:

__mutex_unlock_slowpath(atomic_t *lock_count)
{
      struct mutex *lock = container_of(lock_count, struct mutex, count);

      __mutex_unlock_common_slowpath(lock, 1);
}

In the __mutex_unlock_common_slowpath function we will get the first entry from the wait queue if the wait queue is not empty and wakeup related process:

if (!list_empty(&lock->wait_list)) {
    struct mutex_waiter *waiter =
           list_entry(lock->wait_list.next, struct mutex_waiter, list); 
                wake_up_process(waiter->task);
}

After this, a mutex will be released by previous process and will be acquired by another process from a wait queue.

That's all. We have considered main API for manipulation with mutexes: mutex_lock and mutex_unlock. Besides this the Linux kernel provides following API:

  • mutex_lock_interruptible;
  • mutex_lock_killable;
  • mutex_trylock.

and corresponding versions of unlock prefixed functions. This part will not describe this API, because it is similar to corresponding API of semaphores. More about it you may read in the previous part.

That's all.

Conclusion

This is the end of the fourth part of the synchronization primitives chapter in the Linux kernel. In this part we met with new synchronization primitive which is called - mutex. From the theoretical side, this synchronization primitive very similar on a semaphore. Actually, mutex represents binary semaphore. But its implementation differs from the implementation of semaphore in the Linux kernel. In the next part we will continue to dive into synchronization primitives in the Linux kernel.

If you have questions or suggestions, feel free to ping me in twitter 0xAX, drop me email or just create issue.

Please note that English is not my first language and I am really sorry for any inconvenience. If you found any mistakes please send me PR to linux-insides.

Links