0%

python-IO密集任务/计算密集任务/多线程/多进程

Python 作为编程语言存在多个具体实现,包括最常用的 CPython、超集 Cython、.NET 平台的 IronPython、JVM 上的 Jython,R 语言实现的 RPython、JIT 版本的 PyPy 等等。这里我们只讨论最常用的、官方的 CPython 实现。

GIL是CPython解释器引入的锁,GIL在解释器层面阻止了真正的并行运行。解释器在执行任何线程之前,必须等待当前正在运行的线程释放GIL。事实上,解释器会强迫想要运行的线程必须拿到GIL才能访问解释器的任何资源,例如栈或Python对象等。这也正是GIL的目的——阻止不同的线程并发访问Python对象。这样GIL可以保护解释器的内存,让垃圾回收工作正常。但事实上,这却造成了程序员无法通过并行执行多线程来提高程序的性能。如果我们去掉CPython的GIL,就可以让多线程真正并行执行。GIL并没有影响多处理器并行的线程,只是限制了一个解释器只能有一个线程在运行

  • 引入GLC的好处:

    • 单线程情况下更快。
    • 瓶颈在于 I/O 的多线程环境下更快。
    • CPU 耗时操作发生在 C 库调用上时更快。
    • 编写 C 扩展会更容易:除法你手动指定,否则不会发生 Python 线程切换的问题。
    • 封装 C 库变得更容易,因为不需要考虑线程安全问题。如果该库不是线程安全的,你只需要保证调用时 GIL 是锁定的。

线程 VS 进程:

  • 进程是应用程序的一个执行实例,比如,在桌面上双击浏览器图标将会运行一个浏览器。
  • 线程是一个控制流程,可以在进程内与其他活跃的线程同时执行。“控制流程”指的是顺序执行一些机器指令。
  • 进程可以包含多个线程,所以开启一个浏览器,操作系统将创建一个进程,并开始执行这个进程的主线程。每一个线程将独立执行一系列的指令(通常就是一个函数),并且和其他线程并行执行。
  • 同一个进程内的线程可以共享一些地址空间和数据结构,所以线程也被称作“轻量进程”
  • IO密集型任务 VS 计算密集型任务 > - 所谓IO密集型任务,是指磁盘IO、网络IO占主要的任务,计算量很小。典型的如请求网页、读写文件 > - 所谓计算密集型任务,是指CPU计算占主要的任务,CPU一直处于满负荷状态。比如在一个很大的列表中查找元素

对于IO密集型任务我们采用多线程,对于计算密集型任务,我们多采用多进程进行并行计算加速

函数调用

什么是栈帧(stack frame)

栈帧是存储函数调用活动的实体,每一次函数的调用,都会在调用栈(call stack)上维护一个独立的栈帧(stack frame).每个独立的栈帧一般包括:

  1. 函数的返回地址和参数
  2. 临时变量: 包括函数的非静态局部变量以及编译器自动生成的其他临时变量
  3. 函数调用的上下文栈是从高地址向低地址延伸,一个函数的栈帧用ebp 和 esp 这两个寄存器来划定范围.ebp 指向当前栈帧的底部,esp 始终指向栈帧的顶部;ebp 寄存器又被称为帧指针(Frame Pointer);esp 寄存器又被称为栈指针(Stack Pointer)

函数调用过程

在函数调用的过程中,有函数的调用者(caller)和被调用的函数(callee). 调用者需要知道被调用者函数返回值; 被调用者需要道传入的参数和返回的地址,函数调用分为以下几步:

  1. 参数入栈: 将参数按照调用约定(C 是从右向左)依次压入系统栈中;
  2. 返回地址入栈: 将当前代码区调用指令的下一条指令地址压入栈中,供函数返回时继续执行;
  3. 代码跳转: 处理器将代码区跳转到被调用函数的入口处;
  4. 栈帧调整:

    1. 将调用者的ebp压栈处理,保存指向栈底的ebp的地址(方便函数返回之后的现场恢复),此时esp指向新的栈顶位置push ebp
    2. 将当前栈帧切换到新栈帧(将eps值装入ebp,更新栈帧底部), 这时ebp指向栈顶,而此时栈顶就是old ebp mov ebp, esp
    3. 给新栈帧分配空间 sub esp, XXX

函数返回

函数返回分为以下几步:

  1. 保存被调用函数的返回值到 eax 寄存器中 mov eax, xxx
  2. 恢复 esp 同时回收局部变量空间 mov ebp, esp
  3. 将上一个栈帧底部位置恢复到 ebp pop ebp
  4. 弹出当前栈顶元素,从栈中取到返回地址,并跳转到该位置 ret

函数里面要用到数据,如果数据属于性线程级别的(比如函数形参-->局部变量-->存在栈上-->每个线程都有自己的栈),那么多线程同时调用是没关系的,因为用的都是本线程的数据;但是如果函数用到一些全局数据,比如全局变量,根据堆内存首地址去访问的堆内存(形参传入的),同时操作一个数据结构(如对一个链表的操作),静态局部变量,那就存在数据安全问题,必须要加锁对函数访问加锁。

因此需要互斥处理的,一般是函数中有全局变量,有动态申请的空间,有静态局部变量,有需要进程数据循环发送之类的操作需要进行互斥处理。

线程安全函数和可重入函数

线程安全的(Thread-Safe):如果一个函数在同一时刻可以被多个线程安全地调用,就称该函数是线程安全的。线程安全函数解决多个线程调用函数时访问共享资 源的冲突问题。 可重入(Reentrant):函数可以由多于一个线程并发使用,而不必担心数据错误。可重入函数可以在任意时刻被中断,稍后再继续运行,不会丢失数据。可重入   性解决函数运行结果的确定性和可重复性。可重入函数编写规范为:

  • 不在函数内部使用静态或全局数据 
  • 不返回静态或全局数据,所有数据都由函数的调用者提供。 
  • 使用本地数据,或者通过制作全局数据的本地拷贝来保护全局数据。
  • 如果必须访问全局变量,利用互斥机制来保护全局变量。
  • 不调用不可重入函数。

两者之间的关系:

  • 一个函数对于多个线程是可重入的,则这个函数是线程安全的。
  • 一个函数是线程安全的,但并不一定是可重入的。
  • 可重入性要强于线程安全性。

python的函数传参

在讨论python的函数是如何传参之前,我们先讨论一下c/c++的函数传参方式, 对于c语言来说只有一种 call-by-value, 简单说就是实参会如实拷贝至形参,而c++会多了几种,下面请看:

c/c++函数传参

  1. 按值传参 按值传参的概念非常好理解,就是函数接收到了传递过来的参数后,将其拷贝一份,其函数内部执行的代码操作的都是传递参数的拷贝。也就是说,按值传参最大的特点就是不会影响到传递过来的参数的值,但因为拷贝了一份副本,会更浪费资源一些。
  2. 按(左值)引用传参 简单来说形参是实参的别名
  3. 按常量引用传参,比如下面的例子
    1
    string randomItem( const vector<string> & arr );
  4. 按右值引用传参 C++11 全面引入移动语义:
    1
    2
    3
    x = y
    can be a copy if y is an lvalue,
    but a move if y is an rvalue.
    调用拷贝构造函数主要有以下场景:
  • 对象作为函数的参数,以值传递的方式传给函数。 
  • 对象作为函数的返回值,以值的方式从函数返回
  • 使用一个对象给另一个对象初始化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <iostream>
using namespace std;
class A
{
public:
A():m_ptr(new int(0)){cout << "construct" << endl;}
A(const A& a):m_ptr(new int(*a.m_ptr)){ cout << "copy construct" << endl; }
A(A&& a) :m_ptr(a.m_ptr) {
a.m_ptr = nullptr;
cout << "move construct" << endl;
}
~A(){ delete m_ptr;}
private:
int* m_ptr;
};
A GetA(){
return A();
}
int main() {
A a = GetA();
return 0;
}

输出结果:

1
2
3
construct
move construct
move construct

使用 -fno-elide-constructors 选项编译上述代码,可以关闭编译器的ROV优化,输出结果表明,并没有调用拷贝构造函数,只调用了move construct函数。

这就是移动语义,需要注意的一个细节是,我们提供移动构造函数的同时也会提供一个拷贝构造函数,以防止移动不成功的时候还能拷贝构造。更详细的内容参考这篇文章 - 从4行代码看右值引用

python的name binding

python中一切皆对象, a = 1 这条语句就是把名字a绑定在1这个对象上

1
2
3
4
5
6
7
8
9
10
11
12
13
14
a = [1,2,3]
def foo(b):
print(b is a)
b.append(4)
print(b is a)
foo(a) # 会打印出 True True
print(a) # [1,2,3,4]

def bar(c):
print(c is a)
c = [0,0,0]
print(c is a)
bar(a) # 会打印出 True False
print(a) # [1,2,3,4]
  • 刚开始执行foo(a)的时候,名字b与a绑定的的是同一个list,所以 b is a 返回True,然后执行b.append(4), 实际是对它俩绑定的那个list对象进行操作,执行完以后,它俩仍然绑定这个list对象,所以还是返回True。那么在foo函数执行完以后,通过a去引用这个list对象,它的内容就是1,2,3,4.
  • 再看bar(a)的执行情况,刚开始名字c 和 a 都是绑定这个list对象,所以 c is a返回True. 然后执行c = [0,0,0],表明名字c 绑定到了另外一个list对象上了,而名字a 仍然绑定着原来的那个list对象。所以 c is a 返回False,bar函数执行完以后,通过名字a引用到的那个list的内容还是包含1,2,3,4.

我们把python中的这种引用方式称作句柄引用, 和c++中的别名引用有一定区别

python的multiprocessing

multiprocessing模块提供了Process能让我们通过创建进程对象并执行该进程对象的start方法来创建一个真正的进程来执行任务,该接口类似threading模块中的线程类Thread.

但是当被操作对象数目不大的时候可以使用Process动态生成多个进程,但是如果需要的进程数一旦很多的时候,手动限制进程的数量以及处理不同进程返回值会变得异常的繁琐,multiprocessing模块提供了一个进程池Pool类,负责创建进程池对象,并提供了一些方法来讲运算任务offload到不同的子进程中执行,并很方便的获取返回值。multiprocessing.pool提供以下多种方式可以用来将任务分给到各个子进程:

Multi-args Concurrence Blocking Ordered-results
Pool.apply No No Yes Yes
Pool.apply_async No Yes No No
Pool.map No Yes Yes Yes
Pool.map_async No Yes No Yes
Pool.starmap Yes Yes Yes Yes
Pool.stamap_async Yes Yes No Yes

注意:

  • Pool.imap and Pool.imap_async – lazier version of map and map_async.
  • Pool.starmap 和Pool.map 相比可以接受多参数函数
  • Async methods submit all the processes at once and retrieve the results once they are finished. Use get method to obtain the results.
  • Pool.map(or Pool.apply)methods are very much similar to Python built-in map(or apply). They block the main process until all the processes complete and return the result.
  • 需要说明的是,最后一列Ordered-results表示各个方法返回的结果是否是有序的, 不是表示各个函数的返回值是否是按照调用的顺序返回的

chunksize影响优化效率

现在有一个需要加速的计算任务, xl是一个list of set, 我们要判断yl列表中的每一个元素是否存在set内, 两层for循环嵌套计算,具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import numpy as np
import time
import multiprocessing

if __name__ == '__main__':

xl = [set(map(lambda x: str(x), np.random.randint(400, size=100))) for i in range(80000)]
yl = list(map(lambda x: str(x), np.random.randint(400, size=400)))

#单进程执行
local_time = time.time()
print("start pf time ", local_time)
res = [ iy in ix for iy in yl for ix in xl]
end_time = time.time()
print("pf during time ", end_time - local_time)
print(len(res), res[0:8])

输出:

1
2
3
4
multiprocessing.cpu_count() 4
start pf time 1585446005.5070348
pf during time 6.3575873374938965
32000000 [False, False, False, False, False, False, True, False]
我们使用pool.starmap进行并行加速

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import numpy as np
import time
import multiprocessing
def func_task(x, yl): return [y in x for y in yl]
if __name__ == '__main__':

print("multiprocessing.cpu_count()", multiprocessing.cpu_count())
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
xl = [set(map(lambda x: str(x), np.random.randint(400, size=100))) for i in range(80000)]
yl = list(map(lambda x: str(x), np.random.randint(400, size=400)))

#使用starmap
task = [(ix, yl) for ix in xl]
local_time = time.time()
print("start pf time ", local_time)
results = pool.starmap(func_task, task)
end_time = time.time()
print("pf during time ", end_time - local_time)
print(len(results), results[0][0:8])

输出:

1
2
3
4
multiprocessing.cpu_count() 4
start pf time 1585446134.5832891
pf during time 7.450953006744385
80000 [False, False, False, False, False, False, False, False]
居然变慢了,这是怎么回事? 带着这样的疑问查阅了相关资料 python-multiprocessing-understanding-logic-behind-chunksize

我的理解是合适的chunksize与具体的问题规模以及计算量相关, 不存在普适的公式,原文说:

More chunks mean more overhead, but increased scheduling flexibility. How this answer will show, this leads to a higher worker-utilization on average, but without the guarantee of a shorter overall computation time for every case.

更过的chunks意味着更多的资源消耗,但是会提升scheduling flexibility,这再一定程度提升了平均cpu使用, 但是不能确保整体time消耗降低

据此我尝试了chunksize分别是10,100,1000,10000的情况,结果如下:

chunksize time consumption (seconds)
10 5.82
100 6.18
1000 5.61
10000 9.42

在chunksize=256时取得了4.8S的时间消耗

小结

  • 函数中有全局变量,动态申请的空间,静态局部变量在同进程下的多线程需要考虑互斥
  • IO密集任务使用多线程 计算密集任务使用多进程
  • python多进程并行计算使用时需要尝试调整chunksize来优化时间消耗