优雅地同时遍历多个容器

问:给定容器 C1, C2, C3, … Cn, 且已知 size(C1) == size(C2) == … == size(Cn);
如何优雅地

      for (c1, c2, c3... cn) in (C1, C2, C3, ... Cn):
        // do something

意义:可能会有两个同样大小的变长容器(vector 啊 list 啊 map 啊 blabla),确认是同样大小了,希望对其中索引相同的元素做某些事情,这种需求不属于最常见的一类,但属于一般都会遇到的一类。

答:
为了直观感受 “不优雅”, “丑陋的”,展示一下现有条件下可能的实现:

    C1_T::iterator iter1 = C1.begin();
    C2_T::iterator iter2 = C2.begin();
    ...
    Cn_T::iterator itern = Cn.begin();

    for (; iter1 != C1.end(); ++iter1, ++iter2, ++iter3, ... ++itern) {
      // do something
    }

这是 C++03 的实现。简直丑得丧心病狂。另一种方法是使用索引:

    for (int i = 0; i < C1.size(); ++i) {
      // do something
    }

这个就要求 Cx 实现 [] 操作符,而 Container Concept 并不要求实现者实现 operator [],
事实上把该方法把容器限制在了 vector,array,bitset 等内从而不具有通用性。

C++11 和 14 目前没有找到可能的更好看的实现。

当然 stackoverflow 上推荐 boost 的
zip_iterator。那个确实挺优雅的,但我们要自己搞。

关于优雅的定义,那个 for … in 的代码就很优雅。当然我觉得,
iterate(fun_do_something, C1, C2, ... , Cn) 在 C++ 这样的限制条件下也是很优雅的。
前一种做法需要把 (C1, … Cn) 包装成 某种东西,其 begin 和 end (c++11 的 range based
for
要求用户实现 begin 和 end 如果有人不知道的话) 能返回 tuple(最好是
tuple),那么就可以写成如下形式:

    std::for_each(make_enumerable(C1, C2, ... Cn), [&](auto c1, auto c2, ... auto cn) {
      // do something
    });

这里的 for_each 需要 C++1z 的支持(这个叫 uniform for_each)。所以现阶段下,只能写成:

    auto e = make_enumerable(C1, C2, ... Cn)

    std::for_each(begin(e), end(e), [&]( ... ) {
      // do something
    });

这里的 make_enumerable 也是现实里没有的,必须读者自己实现。


以上和本文主旨几乎没有关系。

下面要讲的是另一种的优雅及其实现:

    iterate([&](auto c1, auto c2, ... auto cn) {
      // do something
    }, C1, C2, ... Cn);

首先给出原型:

    template <class F, class CF, class ... C>
    void iterate(F f, CF&& first, C&& ... c);

CF 表示 First Container,我们用这个 容器做 iter != end 的判断。
C … 表示余下的容器,

然后是用法:

    int a[] = {1, 2, 3, 4, 5};
    std::vector<int> b = {2, 4, 6, 8, 10};
    std::array<int, 5> = {1, 3, 5, 7, 9};

    iterate(std::plus<int>(), a, b);

    iterate([](int _1, int _2, int _3) {
      std::cout << _1+_2+_3 << std::endl;
    }, a, b, c);

经过和我妹子讨论,解决方法一共提出三种。并且也会说明,思维定势将如何使人变成傻逼。
首先,作为一个搞了一段时间元编程的较有经验的选手,我的思路是这样的:

  1. 要存 iterator,因为要对 iterator 做 ++ 操作
  2. 由 1) 得,该操作必须在运行期进行,因此,必须使用 std::tuple 存储 iterator. 因为大家基本都是这么写的。std::bind 也是这么存着变量的。并且 std::tuple 是粘合编译期和运行期的利器。
  3. 必须将 tuple 的元素展开成 f 里的参数。即: std::tuple< int, int > param, f(param)是不对的,要把 param 展开成 f(get<0>(param), get<1>(param));
  4. 以上,写一个 tuple to function parameter 的函数。

所以得到实现如下:

    template <class F, class ... T>
    void invoke_and_advance(F f, T& ... t) {
        f(*(t++)...);
    }

    template <int idx>
    struct iterate_unpack_impl {
        template <class F, class Tp, class ... Unpack>
        void operator() (F f, Tp& tp, Unpack& ... args) {
            iterate_unpack_impl<idx-1>()(f, tp, get<idx>(tp), args...);
        }
    };

    template <>
    struct iterate_unpack_impl<0> {
        template <class F, class Tp, class ... Unpack>
        void operator() (F f, Tp& tp, Unpack& ... args) {
            invoke_and_advance(f, get<0>(tp), args...);
        }
    };

    template <class F, class Tp>
    void iterate_unpack(F f, Tp& tp) {
        iterate_unpack_impl<tuple_size<Tp>::value-1>()(f, tp);
    }

    template <class F, class CF, class ... C>
    void iterate(F f, CF&& first, C&& ... c) {
        auto ifirst = begin(first);
        auto tp = make_tuple(begin(first), begin(c)...);

        while (ifirst++ != end(first)) {
            iterate_unpack(f, tp);
        }
    }

解释一下元编程基本技巧:tuple 元素展开成 function 里面的参数。

这个在标准库和 boost 里有广泛的应用。特点是看似函数很多,内联一下消失个一干二净。

原理是通过一个

  func(Tuple<Args...> tp, Extracted_Args ... args);

然后通过给 func 特化 tuple 的索引,拉出这次需要 extract 的参数。而这次弄出来的参数又会成为 args… 里的一员,传入到下一次调用中。

在上面也就是:

    template <int idx>
    struct iterate_unpack_impl {
        template <class F, class Tp, class ... Unpack>
        void operator() (F f, Tp& tp, Unpack& ... args);
    }

这里的 idx 就是 get(tuple) 中的索引,后面跟着已经拉出来的参数。
这样的实现又臭又长,显得特别装逼不让人看懂。

顺便注意一下 iterate 原型里的 完美转发。也可以是 const T&, 但这样的话就不能对 元素做操作了。同理我也没用 cbegin 和 cend。

我妹的实现如下:

    template <class F, class IterFirst, class ... Iter>
    void invoke_and_advance(F f, IterFirst first, IterFirst last, Iter ... iter) {
        if (first != last) {
            f(*first, *iter++ ...);
            invoke_and_advance(f, ++first, last, ++iter...);
        }
    }

    template <class F, class CFirst, class ... Container>
    void iterate(F f, CFirst&& fc, Container&& ... c) {
        invoke_and_advance(f, begin(fc), end(fc), begin(c)...);
    }

乍看一下是有问题的,递归,自己调自己,基本上是不让编译器做优化了。在不开 O 的时候的测试也证明,在 vector 的大小到 10w+ 的时候,就会爆调用栈。
可是我没考虑到的是,这里是尾递归,开完 O2 照样内联个一干二净,最后的汇编码和我的实现相比只差了两行。

第三个实现,致命一击:

    template <class F, class IterFirst, class ... Iter>
    void invoke_and_advance(F f, IterFirst first, IterFirst last, Iter ... iter) {
        while (first != last) {
            f(*(first++), *(iter++) ...);
        }
    }

    template <class F, class CFirst, class ... Container>
    void iterate(F f, CFirst&& fc, Container&& ... c) {
        invoke_and_advance(f, begin(fc), end(fc), begin(c)...);
    }

应该是最优雅的实现了。

以上的教训是,不要形成思维定势一样的想法,什么什么问题用什么什么手段解,元编程本身就又臭又长,如果不取点巧的话简直是没法看了。


另:我能想到的以上代码存在的问题:
1. 为了使 f 符合 callable 的定义,应该使用 invoke(f, c1, c2, …); 这样的调用。当然 invoke 还没进标准。。。
2. 返回值应该能推导。
3. *iter++ 带来的可能的性能损失。这个感觉会优化掉。没看过汇编,不知道。不懂

另,基于 C++03 这样的落后语言的情况下,手法只能是 预处理元编程生成 iterate 的 参数 1~50 的函数族,从而把整份代码隐藏在一片一片的 PP 之中了吧。

如果有 03 的限制条件下优雅漂亮的写法,求指教~

瞎搞 std::cout 的构造和析构顺序

本文使用 gcc4.8, libstdc++.

引入

昨天侃爷提了个问题:

struct foo {
    foo(const std::string& c) :_c(c) {}
    ~foo() { std::cout << _c; }
    const string& _c;
} bar("aaa") ;

会炸, 为什么呢.

因为 “aaa” 会被赋予一个临时的 string, 然后这个临时 string 在程序结束的时候早挂了, 所以会 sf.
可是, 有没有可能是 cout 先被析构, 导致整个程序挂掉?

讨论

理论上, cout 应该是早于 bar 被声明的(extern), 所以它析构的也会比 bar 晚.

肚学长提出编译器在生成可执行文件的时候做了特殊处理, 我觉得 运行库 和 标准库 这么多, 要为它们一一特判不是很现实. 应该用点力是能把 cout 搞挂掉的.

能想到的最简单的方法当然是强迫 cout 在 bar 后面初始化.

操作

#include <iostream>

struct foo {
    foo() {
        std::cout << "ctorn";
    }
} bar;

int main() {
    return 0;
}

然后 g++ -E test.cpp >test.pre.cpp

打开 test.pre.cpp, 把里面所有 extern ostream 所依赖的前置声明弄出来, 结果如下:

namespace std {
  template<class _CharT>
    struct char_traits;
template<typename _CharT, typename _Traits>
    class basic_ostream;

template<typename _CharT, typename _Traits>
    basic_ostream<_CharT, _Traits>&
    operator<<(basic_ostream<_CharT, _Traits>& __out, const char* __s);

typedef basic_ostream<char, char_traits<char> > ostream;
extern ostream cout;
}

struct foo {
    foo() {
        std::cout << "ctorn";
    }
} bar;

#include <iostream>

int main() {
    return 0;
}

g++ test.cpp && ./a.out

果然不出所料, 在 bar 构造的时候程序挂了. 说明 ostream 的构造是可以被搞挂的.

分析

cout 的构造在什么时候进行呢?

根据标准 ISO-14882/2011:

27.4.1.2

The objects are constructed and the associations are established at some time prior to or during the first time an object of class ios_base::Init is constructed, and in any case before the body of main begins execution.

注意后面这句, 这句话在 03 的标准里是没有的, 所以 03 对 ios_base::Init 的要求比较宽. 但这仍然不能阻止我们艹挂它. 因为 Init 是在 iostream 里调用的, 而 bar 的构造比 ios_base::Init init 要早…所以注定了 cout 的悲剧.

析构

这样看起来, 搞挂析构应该也非常容易才对:


namespace std { template<class _CharT> struct char_traits; template<typename _CharT, typename _Traits> class basic_ostream; template<typename _CharT, typename _Traits> basic_ostream<_CharT, _Traits>& operator<<(basic_ostream<_CharT, _Traits>& __out, const char* __s); typedef basic_ostream<char, char_traits<char> > ostream; extern ostream cout; } struct foo { foo() { // std::cout << "ctorn"; // crash } ~foo() { std::cout << "dtorn"; } } bar; #include <iostream> int main() { return 0; }

可惜这段代码活得很好很开心…
真的是编译器调整了析构顺序吗?
看一下汇编代码:

g++ -S test.cpp

    cmpl    $65535, -8(%rbp)
    jne .L31
    movl    $_ZL3bar, %edi
    call    _ZN3fooC1Ev
    movl    $__dso_handle, %edx
    movl    $_ZL3bar, %esi
    movl    $_ZN3fooD1Ev, %edi
    call    __cxa_atexit
    movl    $_ZStL8__ioinit, %edi
    call    _ZNSt8ios_base4InitC1Ev
    movl    $__dso_handle, %edx
    movl    $_ZStL8__ioinit, %esi
    movl    $_ZNSt8ios_base4InitD1Ev, %edi
    call    __cxa_atexit

先构造, 再把 $_ZN3fooD1Ev 也就是 ~foo 注册给 __cxa_atexit 也就是 atexit, 然后才是 ioinit, ios_base::Init (所以之前构造的时候要是调 cout 就妥妥跪了), 然后才把 ios_base::~Init() 注册给 atexit

也就是说, 顺序应该不是问题. 析构的时候早该爆了.

感觉搞不下去了, 于是静态链接一下, 做 objdump:

g++ -static-libstdc++ test.cpp && objdump -S a.out > dis.s

打开直接搜 foo, 发现如下段:

  408bd8:   bf a4 14 68 00          mov    $0x6814a4,%edi
  408bdd:   e8 4a 00 00 00          callq  408c2c <_ZN3fooC1Ev>
  408be2:   ba 88 d8 45 00          mov    $0x45d888,%edx
  408be7:   be a4 14 68 00          mov    $0x6814a4,%esi
  408bec:   bf 36 8c 40 00          mov    $0x408c36,%edi // _ZN3fooD1Ev
  408bf1:   e8 aa fb ff ff          callq  4087a0 <__cxa_atexit@plt>
  408bf6:   bf a5 14 68 00          mov    $0x6814a5,%edi
  408bfb:   e8 d0 81 01 00          callq  420dd0 <_ZNSt8ios_base4InitC1Ev>
  408c00:   ba 88 d8 45 00          mov    $0x45d888,%edx
  408c05:   be a5 14 68 00          mov    $0x6814a5,%esi
  408c0a:   bf a0 16 42 00          mov    $0x4216a0,%edi // _ZNSt8ios_base4InitD1Ev
  408c0f:   e8 8c fb ff ff          callq  4087a0 <__cxa_atexit@plt>
  408c14:   c9                      leaveq 
  408c15:   c3                      retq   

除了把地址定下来了好像也没啥区别, 搜一下 4216a0, 也就是 ios_base::~Init 的地址:

发现有好多处出现, 另一个地方是

0000000000421740 <_ZNSt8ios_base15sync_with_stdioEb>:

也就是 ios_base::sync_with_stdio

于是修改程序如下:

namespace std {
  template<class _CharT>
    struct char_traits;
template<typename _CharT, typename _Traits>
    class basic_ostream;

template<typename _CharT, typename _Traits>
    basic_ostream<_CharT, _Traits>&
    operator<<(basic_ostream<_CharT, _Traits>& __out, const char* __s);

typedef basic_ostream<char, char_traits<char> > ostream;
extern ostream cout;
}

struct foo {
    foo() {
        // std::cout << "ctorn"; // crash
    }
    ~foo() {
        std::cout << "dtorn";
    }
} bar;

#include <iostream>

int main() {
    std::ios_base::sync_with_stdio(false);
    return 0;
}

这回终于有不同了, 可惜不是挂了, 而是没有输出.
为啥 cout 如此如此坚挺呢? 只能通过标准和源码找答案了

标准

27.4.1.2
The objects are not destroyed during program execution.

cout 在程序执行期间不会被 destroy

如何做到?

实现

实现很屌的啊, 闻所未闻啊.

首先看 Init:

    new (&buf_cout_sync) stdio_sync_filebuf<char>(stdout);
    new (&buf_cin_sync) stdio_sync_filebuf<char>(stdin);
    new (&buf_cerr_sync) stdio_sync_filebuf<char>(stderr);

    // The standard streams are constructed once only and never
    // destroyed.
    new (&cout) ostream(&buf_cout_sync);
    new (&cin) istream(&buf_cin_sync);
    new (&cerr) ostream(&buf_cerr_sync);
    new (&clog) ostream(&buf_cerr_sync);

在 cout 的地址上调用 placement new 构造 ostream.

然后 ~Init 里, 根本不析构 cout.

  ios_base::Init::~Init()
  {
    // Be race-detector-friendly.  For more info see bits/c++config.
    _GLIBCXX_SYNCHRONIZATION_HAPPENS_BEFORE(&_S_refcount);
    if (__gnu_cxx::__exchange_and_add_dispatch(&_S_refcount, -1) == 2)
      {
        _GLIBCXX_SYNCHRONIZATION_HAPPENS_AFTER(&_S_refcount);
    // Catch any exceptions thrown by basic_ostream::flush()
    __try
      { 
        // Flush standard output streams as required by 27.4.2.1.6
        cout.flush();
        cerr.flush();
        clog.flush();

#ifdef _GLIBCXX_USE_WCHAR_T
        wcout.flush();
        wcerr.flush();
        wclog.flush();    
#endif
      }
    __catch(...)
      { }
      }
  } 

PS: 汇编里一大坨 unwind. 说明编译器还是认为有可能丢异常的.

这样如果不 delete, cout 就不会被析构.

真的是这样的吗?…

假的……如果 cout 是一个 ostream 类型, 那么无论如何它也逃不出被析构的命运.

可是假如 cout 不是 ostream 类型呢?…

来看 cout 的 定义之地

  typedef char fake_istream[sizeof(istream)]
  __attribute__ ((aligned(__alignof__(istream))));
  typedef char fake_ostream[sizeof(ostream)] 
  __attribute__ ((aligned(__alignof__(ostream))));
  fake_istream cin;
  fake_ostream cout;
  fake_ostream cerr;
  fake_ostream clog;

ostream 是一个 char array 类型, 具有和 ostream 相同的 size 以及相同的 align. 因为 是 char array, 所以没有什么东西会在上面做析构, 也保证了 cout 在程序结束之前不会被 destroy.

那该怎么去搞它? 想不出来了 @_@

其他福利

libstdc++ 的 sync_with_stdio 实现有 bug, 设为 false 以后就不能再设回来了…虽然不明白为什么这么做…不过应当可以去报一个 bug…

scope_exit 的 11 实现

#ifndef SCOPE_EXIT_HPP
#define SCOPE_EXIT_HPP
#include "pp_cat.h"
#include <functional>

#define SCOPE_EXIT_CLASSNAME(id) PP_CAT(__unique_classname__, id)
#define SCOPE_EXIT_INSTANCENAME(id) PP_CAT(__unique_instance__, id)

#define SCOPE_EXIT(...) struct SCOPE_EXIT_CLASSNAME(__LINE__) {
    typedef std::function<void(void)> fun; 
    fun f_; 
            
    SCOPE_EXIT_CLASSNAME(__LINE__) (fun f) :f_(f) { } 
    ~SCOPE_EXIT_CLASSNAME(__LINE__) () { f_(); } 
} SCOPE_EXIT_INSTANCENAME(__LINE__) ([__VA_ARGS__]()

#define SCOPE_EXIT_END );

#endif // SCOPE_EXIT_HPP

设计思路见 boost. 主要的方法是利用 析构函数, 在退出 scope 时自动执行代码.
1. 为了使 SCOPE_EXIT 里的代码不自动执行, 预处理是必须的. 为了使 SCOPE EXIT 里的代码能迟滞执行, 必须保存它到一个函数, 经由析构函数调用它. 函数里面定义函数是非法的, 但函数里面定义结构体是合法的.

所以: 可以在函数里面定义结构体. (由此就找到一个存放代码的方式了)

2, 为了使一个 SCOPE 里能有多个 SCOPE_EXIT, 必须给每一个 instance 一个特殊的 id.
这个 id 不由用户指定, 解决方法 只能是 预处理.

不能使用自增式的 macro 的方式使得 每一个 instance 有唯一标号. 理由是, 目前, 自增式的 macro 只能通过 #include 来拿到

e.g

#include "pp_auto_inc.h" // expand to nothing
PP_AUTO_INC // expand to 1
#include "pp_auto_inc.h"
PP_AUTO_INC // expand to 2

这条排除后很难想象有啥东西能提供唯一的 id. 但其实有一个宏很好用. __LINE__. 写入标准, 所以代码完全合法. 能保证在该编译单元内只有唯一 id. 还要注意, PP_CAT 是让 __LINE__ 被显式 expand 所必须的

最后是改进: 我的代码不够好, 可以改的地方毛想想还挺多的. 因为是 C++11, 实际上 可以把定义在 scope 里的结构体单独弄出来了. 而且 scope_exit_end 这个宏应该是能消除掉的. 然后还没考虑过在 namespace, class 这种地方能不能用( 应该不能让它能用? )

下面是测试代码

#include <iostream>
#include "scope_exit.hpp"
using namespace std;

int* foo() {
    int* p;
    const char * str = "hello world";

    SCOPE_EXIT(&p) {
        cout << "dtor1n";
        cout << p << endl;
        delete p;
        p = 0;
        cout << p << endl;
        cout << "dtor1 endn";
    } SCOPE_EXIT_END

    SCOPE_EXIT(&) {
        cout << str << endl;
        str = nullptr;
    } SCOPE_EXIT_END

    p = new int(10);
    cout << p << endl;
    cout << *p << endl;

    return p;
}

int main() {
    cout << foo();
}

scanf 相关

好像集训队的学长们对这个都很熟. 我啥都不懂在乱用土死了.
集合匹配用 []

逆向匹配用 [^]

比方

scanf("%[ab,]", str); // 1
scanf("%[^;]",str); // 2

1 匹配 所有ab和逗号.  “aabbb,ab,aaacasdsdef”
2. 匹配除了分号外的所有字符 “aaa;bbbbccc”

  • 表示匹配但不赋值.
scanf("%[ab,]");

然后 http://en.cppreference.com/w/cpp/types/integer

用法 “%” SCNd64 比如. 貌似是 C99 的标准…看了下源码其实就是各种平台相关 define…

嗯差不多就这样 简直太土了…

流水帐

重装系统的诱因是 mobaXterm 不尽如人意。
再说公司电脑不可能用来玩游戏,用 windows 就没啥诱惑力了。

早上研究了半天从 mobile device (更准确地说,我华为手机) 安装 linux,发现还是有点麻烦的,于是弃了。借了个 U 盘装 fedora. 1.

  1. 阿里旺旺没有 fedora 版本的,有一个从 deb 包 ebuild 过来的,但未知原因报错
    file / from install of nixnote-1.6-2.x86_64 conflicts with file from package filesystem-3.2-19.fc20.x86_64 
    file /usr/bin from install of nixnote-1.6-2.x86_64 conflicts with file from package filesystem-3.2-19.fc20.x86_64 
    

类似这种的。

于是强制解开,手动拷贝到相应目录下。
2. evernote 没有 linux 版本的。但 nevernote 还可以用。出于相同原因,必须暴力安装
3. ssh 的 multiplexing 能用,于是很爽。
4. 终于用上 zsh 了。也非常爽。
5. fedora 的字体设置不好看,我跟从 这篇文章 某个回复的指导搞定了。
6. 以后要用 ibus-daemon 的时候启动一下,不用的时候 ctrl-c 掉,蛤蛤蛤蛤蛤

cygwin 下的 ssh multiplexing

结论是不能用,蛤蛤蛤


某司最近猛抓安全,登陆 ssh 都要输动态密码了。

坑爹嘛。

大家纷纷想办法登陆时只输一次密码,后续就不需要再输入了。其中的代表有 securecrt 的 clone session 和 openssh 的 multiplexing
而功能很好很强大的 mobaXterm 竟然直接没有这个选项!

我师傅期待证明 securecrt 比 mobaXterm 好用已久,以往常难以得逞,这次难道他要大功告成?

于是搜到 multiplexing. 用了一下,竟然会 mux_client_request_session: send fds failed

于是网上搜啊搜啊搜,发现信息如下:

https://sourceware.org/ml/cygwin/2005-11/msg00737.html
https://sourceware.org/ml/cygwin/2005-10/msg00672.html
https://cygwin.com/ml/cygwin/2010-08/msg00088.html
https://github.com/ansible/ansible/issues/6363

意思是,Cygwin still doesn’t support descriptor passing via sendmsg/recvmsg.

好了问题来了:现在已知 mobaXterm is based on cygwin,请问mobaXterm 是否支持 duplicate session / multiplexing ?

ls after every cd

20150109 update:

因为有时候主目录下东西特别多, cd 进去很慌, 会 ls 出一大堆东西, 所以想法子做了限制, 只有文件数量少于 100 的时候 ls, 数量太多就默认不 ls 了.所以 改正如下:

builtin cd “$*” && (( $(ls -l | wc -l) < 100 )) && ls


想了想标题实在不知所云,需求很简单,在 cd 之后自动打个 ls, 仅此而已

function cd()
{
     builtin cd "$*" && (( $(ls -l | wc -l) < 100 ))  && ls
}

是比较好的解法。

之前想着 hook 每一个命令,根据这个这个有如下做法:

preexec () { :; }
preexec_invoke_exec () {
    cmd=$(echo $BASH_COMMAND | cut -f 1 -d ' ');
    if [ "$cmd" == "cd" ]; then
        ls
    fi
}
trap 'preexec_invoke_exec' DEBUG

但会发现 ls 在 cd 实际发生前发生,所以等于白搭。要继续 hack 的话嫌麻烦。想想 cd 后 ls 应该不是什么很奇怪的需求,所以在 这里 找到了解法。

anyway, 貌似把 bashrc 里的某个系统自动加的表达式去了以后 sftp 突然能用了,所以 mobaXterm 现在会主动列出文件列表,真是一个莫名的惊喜。

但是事实上,这样就剥夺了 cd 后 ls 的惯性带来的乐趣,使人难以适应。不过大约过几天会好~

libcxx 的 shared_ptr 实现分析

##以 libcxx 为对象的 shared_ptr 实现分析。##

注意:gcc 用的是 libstdc++ , 这里的 libcxxllvm 的项目. 两者的 shared_ptr 实现相似但不相同

一个 shared_ptr/weak_ptr 存放两个指针,一个是被 manage 的 Tp* ptr, 一个是管理块, cntrl.
所以一个 shared_ptr/weak_ptr 的 空间开销 = 2*sizeof pointer。待会再说 cntrl 的空间开销。
访问 ptr 的时间开销是 1,没有性能损失。

ptr 没什么好说,每一个 shared_ptr/weak_ptr 存一份。那么 cntrl 呢?
首先说 cntrl 干什么。
cntrl 块存

  1. shared_count
  2. weak_count
  3. allocator
  4. deleter
  5. ptr

这里的 weak_countweak_ptr 引用计数。待会会发现,shared_ptrweak_ptr 紧密耦合,互不可分。所以谈 shared_ptr 的实现就必须谈 weak_ptr 的实现。
反观 unique_ptr 则自成体系,与这两者无关。

画框架图一个先:
_
所以其实 shared_ptr 和 weak_ptr 的好多操作都是 cntrl 代理的。

cntrl 是什么:

cntrl 是一个叫 __shared_weak_count 的对象。而这个__shared_weak_count 是纯虚类,继承自 __shared_count.


__shared_weak_count__shared_count 干什么呢?

__shared_weak_countweak_count. __shared_countshared_count.

shared_count 做什么呢?存一共多少个 shared_ptr 共享 这个资源。怎么计数的呢?人头-1。也就是说,shared_count == 0 的时候,一共有一个 shared_ptr 占有这个资源。

接下来,__shared_count 还提供了对 shared_count 做更改的原子操作。__add_shared__release_shared, 以及做资源释放的 __on_zero_shared

__shared_weak_count 则继承自 __shared_count. 同样,有 __add_weak, __release_weak 以及做资源释放的 __on_zero_shared_weak.


现在 weak_countshared_count 都有了,但 *ptr, Deleter 和 Alloc 存哪里呢?前面说了 shared_weak_count 是纯虚类,接下来讲 shared_weak_count 的两个实现类。
这两个类叫 shared_ptr_pointer, 以及 shared_ptr_emplace。这两者的区别仅仅在于后者没有 deleter (那谁来释放资源呢?当然是 Alloc.deallocate 函数啊),所以以后只取前者分析就好了。
下一张图:cntrl 的类图:
_1


接下来是核心:资源的构造和析构。

weak_ptr 能 hold 资源吗?答案是不能。
所以一定是 shared_ptr 才能持有资源,也就是构造和析构资源。

资源的构造

一共 下面几种 情况:

  1. hold 一个已有资源:
    比如 shared_ptr<T> p(raw_pointer); 有2, 3, 4, 12, 13 五种情况
  2. 拷贝自另一个 shared_ptr.
    比如 shared_ptr p(another_shared_ptr); 有 8, 9, 10 三种情况
  3. 构造一个空 shared_ptr <nullptr>
    比如 shared_ptr<T> p; 有 1, 5, 6, 7 四种情况

  • hold 已有资源:
    这种情况下:cntrl 没有初始化。首先 new 一个 cntrl 出来。然后根据 deletor 和 allocator 还有 ptr, 设置 shared_weak_pointer 的各项值。

为了显示我读过代码,我们拎一段代码出来看一下吧!

template<class _Tp>
template<class _Yp>
shared_ptr<_Tp >::shared_ptr(_Yp* __p ,
                            typename enable_if<is_convertible<_Yp *, element_type*>:: value, __nat >::type)
    : __ptr_ (__p)
{
    unique_ptr<_Yp > __hold(__p);
    typedef __shared_ptr_pointer <_Yp*, default_delete<_Yp >, allocator<_Yp> > _CntrlBlk;
    __cntrl_ = new _CntrlBlk(__p, default_delete <_Yp>(), allocator<_Yp>());
    __hold.release ();
    __enable_weak_this (__p);
}

原型:shared_ptr(Yp* p)
解释一下 enable_if
后面那个 enable_if 是做条件检查的,基于 SFINAE。如果 Yp* 能 convert 成 element_type*, 则这个函数有定义(否则没有定义),且签名是 shared_ptr::shared_ptr(Yp* p, __nat)
__nat 是 not a type 的缩写。显然函数在声明的时候是给它赋了默认值的,这样函数看起来像 shared_ptr(Yp*) 一样了。
这么做是怕函数体里面出现类型转换错误导致编译不通过。

解释一下 unique_ptr 作甚:
这边有一个 new, new 跪了 p 是要侧漏的。先用 unique_ptr 存住就不怕 ctor 中途异常 p 没人释放了。这里插播一下,unique_ptr 除此之外还有 exception safe 的功能。常用来和 placement new 配合无痛~~人流(误~~申请空间。在 头文件 functional 里可以找到一些实例。

enable_weak_this 基本有两份重载,shared_ptr_pointer 版本没记错的话是不干活的(空函数)。不想深究。

这里注意一下 weak_countshared_count. 它们的值都是 0. 对于 shared_ptr 这是对的(use_count = shared_count+1),因为现在有一个 shared_ptr hold 资源。那对于 weak_ptr 呢?现在明明没有 weak_ptr 指向 cntrl 啊?
其实 shared_ptr 组合本身占用一个 weak_count. 等 shared_ptr 死光了,就会减一个 weak_count. 所以这里的 weak_count 也还是 0,而不是 -1. 这个待会还会提到。

  • 拷贝自另一个 shared_ptr:
    很简单啊!引用计数+1 就好了

  • 构造一个空 shared_ptr:
    很简单啊!cntrl 赋成 0(NULL) 就好了


构造完了是析构:

析构 shared_ptr 有四种情况:
1. shared_ptr 死完了。weak_ptr 也死完了
2. shared_ptr 死完了。weak_ptr 没死完
3. shared_ptr 本身就是 empty.
4. shared_ptr 没死完,weak_ptr 也没死完


  • shared_ptr 死完了。weak_ptr 也死完了

这种情况的特征是:shared_count = -1. weak_count = 0

首先,release_shared 被调用

void
__shared_weak_count::__release_shared () _NOEXCEPT
{
    if (__shared_count::__release_shared())
        __release_weak ();
}

看到调用了 __shared_count::__release_shared()

bool
__shared_count::__release_shared () _NOEXCEPT
{
    if (decrement(__shared_owners_) == - 1)
    {
        __on_zero_shared ();
        return true ;
    }
    return false;
}

decrement 是原子减。
如果没人持有这个资源了: __on_zero_shared()

template < class _Tp, class _Dp, class _Alloc>
void
__shared_ptr_pointer<_Tp , _Dp, _Alloc>::__on_zero_shared () _NOEXCEPT
{
    __data_.first ().second()( __data_.first ().first());
    __data_.first ().second().~ _Dp();
}

这个 data, 就是 cntrl 里的 <<Tp, Dp>, Alloc> compress_paircompress_pairEBO(empty base-class optimize) 的空类做压缩。
first().second() 就是 deleter, first().first() 就是 ptr. 所以第一行用人话说就是 deleter(ptr). 析构资源。
第二行用人话说就是 ~deleter(); deleter 自杀

然后回去 shared_weak_count::release_sharedrelease_weak(),
这个函数只在 shared_ptr 死光的情况下执行,执行的首要结果就是 weak_count - 1. 也就是没有 shared_ptr 指向这个资源的时候 weak_count 会 – 1.

void
__shared_weak_count ::__release_weak () _NOEXCEPT
{
    if ( decrement (__shared_weak_owners_ ) == - 1 )
        __on_zero_shared_weak ();
}

然后判断 weak_shared 死光了没。死光了,那就 on_zero_shared_weak (没死光?那就把 cntrl 留着,ptr 挂了。这里可以看到,ptr 的释放时间是 shared_ptr 死光光的时候。而 cntrl 的释放时间是 weak_ptrshared_ptr 都死光光的时候)

template < class _Tp, class _Dp , class _Alloc >
void
__shared_ptr_pointer <_Tp , _Dp, _Alloc >::__on_zero_shared_weak () _NOEXCEPT
{
    typename _Alloc :: template rebind < __shared_ptr_pointer >::other __a( __data_ .second ());
    __data_ .second ().~ _Alloc();
    __a .deallocate ( this , 1);
}

这里首先把 alloc rebind 到自己身上。然后把原先的 alloc 析构。然后把自己 delete 掉。这样就把 cntrl 干掉了。

  • shared_ptr 死完了。weak_ptr 没死完

    见上

  • shared_ptr 本身就是 empty.

    if (cntrl) 才干活

  • shared_ptr 没死完,weak_ptr 也没死完

    release_shared();

然后是 weak_ptrshared_ptr 的转换,要加锁,其实现是:

__shared_weak_count *
__shared_weak_count ::lock () _NOEXCEPT
{
    long object_owners = __shared_owners_ ;
    while ( object_owners != -1 )
    {
        if ( __sync_bool_compare_and_swap (&__shared_owners_ ,
                                         object_owners ,
                                         object_owners + 1))
            return this ;
        object_owners = __shared_owners_ ;
    }
    return 0 ;
}

主要是防止提升期间 ptr 已经跪了。

接下来分析时间和空间代价。(这个我不擅长啊 T T

  • 新建 shared_ptr 和 cntrl.

    shared_ptr 占 2*sizeof(pointer)
    cntrl 有两个 count, sizeof(long)*2. alloc, deleter 和 ptr 被压缩了,理想情况(dp ap 都是空)应该是 sizeof(pointer)+1 (?不确定,设为 DA 吧),其实一般都不会空(Deleter 就一般是 callable object)。一个虚表,sizeof(pointer). 总共,至少,有 sizeof(pointer)*3+sizeof(long)*2,外加 dp 和 alloc 占用的空间。在 64 位系统上应该是 3*8+2*8+DA = 40+DA (byte)

    时间上,

    1. unique_ptr(p);
    2. new
    3. unique_ptr.release();

三步操作。


  • 读取 ptr 与普通指针相比没有额外消耗。

  • 新建 shared_ptr (加一个 shared_count

两步操作:
1. 判断 cntrl 是否是 null.
2. cntrl->add_shared(); 这个不是虚函数,直接可搞。其最终调用是 __sync_add_and_fetch(&t, 1);据说是 lock_free 的?不造啊不好说啊
https://gcc.gnu.org/onlinedocs/gcc-4.1.2/gcc/Atomic-Builtins.html
https://news.ycombinator.com/item?id=1687075


  • 析构一个 shared_ptr, shared_ptr 没死完
  1. 两个函数调用 (release_shared)
  2. 两个判断
  3. 一次原子自减

  • 析构一个 shared_ptr, shared_ptr 死完了,weak_ptr 也死完了

    先杀 ptr, 再杀 cntrl. 每次都要判断. 还有两个原子减操作。不想细想了。。。


另及:原子增减操作都涉及到 memory barrier。在访问频繁的情况下容易造成性能问题

又及:本文不要脸地抄自我自己