Yebangyu's Blog

Fond of Concurrency Programming , Distributed System and Machine Learning

Singleton与多线程

如果您和我一样,都只有C++背景,之前对设计模式也一窍不通,那么也没有关系,因为本文不需要您对设计模式有多么了解。

Singleton模式

所谓的单例模式,single instance模式,简单实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class Singleton
{
  public:
    static Singleton* instance();
  private:
    //非常重要!别忘了这些!
    Singleton() {/*your implementation*/}
    Singleton(const Singleton&);
    Singleton& operator=(const Singleton&);
  private:
    static Singleton* pInstance;
};

Singleton* Singleton::pInstance = NULL;

Singleton* Singleton::instance()
{
  if (pInstance == NULL) {
    pInstance = new Singleton();
  }
  return pInstance;
}

多线程下的Singleton模式实现

以上实现在单线程下运行良好(不考虑构造函数抛异常,注意,本文我们都不考虑C++ Exception)。如果是多线程呢?很自然的,我们想到可以加一把锁,或许就万事大吉了

1
2
3
4
5
6
7
8
9
Singleton* Singleton::instance()
{
  LockGuard guard(yourlock);
  if (pInstance == NULL) {
    pInstance = new Singleton();
  }
  return pInstance;
  //guard析构,自动调用unlock
}

double check locking

前面的实现中,存在一个问题:因为只初始化一次,因此大部分情况下其实pInstance都不会为空,只需要简单返回pInstance。既然如此,何必大部分情况下都需要加锁呢?毕竟锁会带来不小的代价的。因此就引入了double check locking,降低锁的使用和冲突,fast path的时候尽最大可能没有任何锁的使用和开销:

1
2
3
4
5
6
7
8
9
10
11
Singleton* Singleton::instance()
{
  if (pInstance == NULL) {
    LockGuard guard(yourlock);
    if (pInstance == NULL) {
      pInstance = new Singleton();
    }
    //guard析构,自动调用unlock
  }
  return pInstance;
}

如你所见,当pInstance不为空时,不需要任何加锁、释放锁的动作,不过这回得两次检查pInstance是否为空。这是因为:假设线程1执行第三行,发现pInstance为空,还未执行第四行的时候,被调度了出去;线程2执行第三行,发现pInstance为空,获得锁,创建了实例,释放锁;当线程1重新恢复调度的时候,实例已经被创建,这时候它必须再次检查pInstance是否为空,否则就两次创建了,既不符合单例模式的语义,可能还会有其他更严重的问题(资源泄漏可能还算小事)。

memory reordering

完美了?看起来很完美,但是问题很大。

注意其中的第6行:

pInstance = new Singleton();

它不是原子操作。这条语句,从逻辑和功能上包括三个部分:

1,分配内存空间,大小是sizeof(Singleton)

2,在该内存空间上调用Singleton的构造函数,完成初始化。

3,将pInstance指向该内存空间

信不信由你,编译器可能会对这三个步骤进行乱序,乱序后可能出现这样的情况:

1
2
3
4
5
6
7
8
9
10
11
12
Singleton* Singleton::instance()
{
  if (pInstance == NULL) {
    LockGuard guard(yourlock);
    if (pInstance == NULL) {
      pInstance = operator new (sizeof(Singleton));
      new (pInstance) Singleton();
    }
    //guard析构,自动调用unlock
  }
  return pInstance;
}

也就是说,乱序后,编译器生成的代码的逻辑是:

1,分配内存单元。

2,将pInstance指向该内存空间。

3,在该内存空间调用构造函数,完成初始化。

乱序后,可能出现如下的data race:线程1执行完第六行后,pInstance不为空;线程2执行第三行判断,发现非空,返回,然而pInstance这时候还没有初始化,这几乎肯定会带来严重的问题。

临时变量也于事无补

你可能想使用一个temp指针来避开这个问题:

1
2
3
4
5
6
7
8
9
10
11
12
Singleton* Singleton::instance()
{
  if (pInstance == NULL) {
    LockGuard guard(yourlock);
    if (pInstance == NULL) {
      Singleton *temp = new Singleton();
      pInstance = temp;
    }
    //guard析构,自动调用unlock
  }
  return pInstance;
}

这下总没有问题了吧?有问题。聪明的非常aggressive的编译器会认为temp在这里是毫无必要的,因此会将它去除,根本不引入任何的temp,乱序还是会发生。

哦,你突然想到,volatile可能可以帮你的忙。因此,你可能将程序变为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class Singleton
{
  public:
    static volatile Singleton* volatile instance();
  private:
    static volatile Singleton* volatile pInstance;
};

volatile Singleton* Singleton::volatile pInstance = NULL;
volatile Singleton* volatile Singleton::instance()
{
  if (pInstance == NULL) {
    LockGuard guard(yourlock);
    if (pInstance == NULL) {
      volatile Singleton *volatile temp = new volatile Singleton();
      pInstance = temp;
    }
    //guard析构,自动调用unlock
  }
  return pInstance;
}

好多volatile,好吓人。简单解释一下:

volatile int * volatile p;

第一个volatile表明,p指向的内容随时可能会变

第二个volatile表明,p本身可能随时会变(p可能改变所指区域)

好了,temp也是volatile,我们试图这样,让编译器不要乱序。

然而,这是compiler dependent的,它可能会work,也可能不会work。如果它碰巧在我们的环境下是有效的,只能说我们是幸运的,然而它显然不可移植了。

明确大胆的说出来

不就是想防止编译器乱序吗?用了那么多volatile看得真心很烦。其实,我们需要的仅仅是一个memory barrier,防止编译器乱序。因此,我们应该显式的告诉编译器,比如这样(以gcc为例):

1
2
3
4
5
6
7
8
9
10
11
12
13
Singleton* Singleton::instance()
{
  if (pInstance == NULL) {
    LockGuard guard(yourlock);
    if (pInstance == NULL) {
      Singleton *temp = new Singleton();
      __asm__ __volatile__("" : : : "memory");
      pInstance = temp;
    }
    //guard析构,自动调用unlock
  }
  return pInstance;
}

是的,这里需要一个memory barrier,告诉编译器不要乱序。

即使这么做了,可能还是有问题。怎么说?C++标准并没有对多个线程读取同一个变量会发生什么进行任何规定。因此,上面的第3、第5、第8行存在data race,因此这是一个未定义的、不可移植的行为。

BTW,在X86下,问题不大,只要该指针是cacheline aligned即可。

让我们来分析下,在X86下,第3、5、8行的读写同步问题。假设该指针已经cacheline aligned,因此第3、5、8行的读写是原子的。

如果cpu 0上的线程1执行了第8行设置操作,在cpu 1上运行的线程2执行第3行。注意,这时候pInstance的新值对线程2不一定可见。然而这里不会有问题,因为:

1,假如可见,那么第3行if判断不成立,成功返回instance。

2,假如不可见,那么第3行if判断成立,此时pInstance实际上已经不为空,实例已经被创建。线程2尝试加锁操作,成功后,在第5行必然看到pInstance不为空,也就是看到pInstance的新值。

这是由锁的语义保证的:加锁解锁构成了一种synchronize with关系,加锁线程不管处在哪个处理器上,都可以看到上一个不管在哪个处理器上的释放锁的线程对变量的更新后的新值。

值得注意的是,为了防止第3行、第5行的读取pInstance的动作被编译器优化,可以使用ACCESS_ONCE宏。细节可以参考我的这篇博客。

正确但是不推荐的做法

写出可移植的、线程安全的Singleton,这么难么?在C++11之前,很难。非常难。

现在,有了C++11,有了C++11的memory model,就不算太难了。

熟悉memory model的同学知道,其实我们这里需要的是一个acquire release语义。设置pInstance的时候使用release语义,读取pInstance的时候使用acquire语义。简单实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class Singleton
{
  public:
    static Singleton* instance();
  private:
    static atomic<Singleton*> pInstance;
};

atomic<Singleton*> Singleton::pInstance;

Singleton* Singleton::instance()
{
  Singleton* temp = pInstance.load(std::memory_order_acquire);
  if (temp == NULL) {
    LockGuard guard(yourlock);
    temp = pInstance.load(std::memory_order_acquire);
    if (temp == NULL) {
      temp = new Singleton();
      pInstance.store(temp, std::memory_order_release);
    }
    //guard析构,自动调用unlock
  }
  return temp;
}

思考:第16行能否使用memory_order_relaxed内存序?

正确且推荐的方案

看上去很复杂?嗯,不过好在,从C++11开始,我们有更简洁的方法:

1
2
3
4
5
Singleton& instance()
{
  static Singleton xx; // C++11规定,这个是线程安全的。
  return xx;
}

只有一种例外,否则我们强烈建议大家坚持使用这种简洁的方法来实现单例模式,强烈不建议自己去拨弄memory model和锁。

只有一种例外?是什么例外?请读者诸君思考(提示:考虑内存管理和生命周期)

拓展

1,查阅资料,了解POSIX中,pthread_once的原理和用法

2,使用volatile的版本中,我们对指针和指针所指之物都加了volatile,也就是:

volatile Singleton *volatile p

能否改为

volatile Singleton *p

或者

Singleton * volatile p

3,查阅资料,解释为什么编译器可能对 pInstance = new Singleton() 进行乱序

4,我们一直没有涉及的一个问题是:pInstance的初始化。它的初始化,是否也可能存在data race?如何避免?