Skip to content
matt90luo's Blog
Go back

python-IO密集任务/计算密集任务/多线程/多进程

Python 作为编程语言存在多个具体实现,包括最常用的 CPython、超集 Cython、.NET 平台的 IronPython、JVM 上的 Jython,R 语言实现的 RPython、JIT 版本的 PyPy 等等。这里我们只讨论最常用的、官方的 CPython 实现。

<!— more —>

GIL是CPython解释器引入的锁,GIL在解释器层面阻止了真正的并行运行。解释器在执行任何线程之前,必须等待当前正在运行的线程释放GIL。事实上,解释器会强迫想要运行的线程必须拿到GIL才能访问解释器的任何资源,例如栈或Python对象等。这也正是GIL的目的——阻止不同的线程并发访问Python对象。这样GIL可以保护解释器的内存,让垃圾回收工作正常。但事实上,这却造成了程序员无法通过并行执行多线程来提高程序的性能。如果我们去掉CPython的GIL,就可以让多线程真正并行执行。GIL并没有影响多处理器并行的线程,只是限制了一个解释器只能有一个线程在运行

线程 VS 进程:

> - 进程是应用程序的一个执行实例,比如,在桌面上双击浏览器图标将会运行一个浏览器。 > - 线程是一个控制流程,可以在进程内与其他活跃的线程同时执行。“控制流程”指的是顺序执行一些机器指令。 > - 进程可以包含多个线程,所以开启一个浏览器,操作系统将创建一个进程,并开始执行这个进程的主线程。每一个线程将独立执行一系列的指令(通常就是一个函数),并且和其他线程并行执行。 > - 同一个进程内的线程可以共享一些地址空间和数据结构,所以线程也被称作“轻量进程”

对于IO密集型任务我们采用多线程,对于计算密集型任务,我们多采用多进程进行并行计算加速

函数调用

什么是栈帧(stack frame)

栈帧是存储函数调用活动的实体,每一次函数的调用,都会在调用栈(call stack)上维护一个独立的栈帧(stack frame).每个独立的栈帧一般包括:

  1. 函数的返回地址和参数
  2. 临时变量: 包括函数的非静态局部变量以及编译器自动生成的其他临时变量
  3. 函数调用的上下文栈是从高地址向低地址延伸,一个函数的栈帧用ebp 和 esp 这两个寄存器来划定范围.ebp 指向当前栈帧的底部,esp 始终指向栈帧的顶部;ebp 寄存器又被称为帧指针(Frame Pointer);esp 寄存器又被称为栈指针(Stack Pointer)

函数调用过程

在函数调用的过程中,有函数的调用者(caller)和被调用的函数(callee). 调用者需要知道被调用者函数返回值; 被调用者需要道传入的参数和返回的地址,函数调用分为以下几步:

  1. 参数入栈: 将参数按照调用约定(C 是从右向左)依次压入系统栈中;

  2. 返回地址入栈: 将当前代码区调用指令的下一条指令地址压入栈中,供函数返回时继续执行;

  3. 代码跳转: 处理器将代码区跳转到被调用函数的入口处;

  4. 栈帧调整:

    1. 将调用者的ebp压栈处理,保存指向栈底的ebp的地址(方便函数返回之后的现场恢复),此时esp指向新的栈顶位置push ebp
    2. 将当前栈帧切换到新栈帧(将eps值装入ebp,更新栈帧底部), 这时ebp指向栈顶,而此时栈顶就是old ebp mov ebp, esp
    3. 给新栈帧分配空间 sub esp, XXX

函数返回

函数返回分为以下几步:

  1. 保存被调用函数的返回值到 eax 寄存器中 mov eax, xxx
  2. 恢复 esp 同时回收局部变量空间 mov ebp, esp
  3. 将上一个栈帧底部位置恢复到 ebp pop ebp
  4. 弹出当前栈顶元素,从栈中取到返回地址,并跳转到该位置 ret

函数里面要用到数据,如果数据属于性线程级别的(比如函数形参—>局部变量—>存在栈上—>每个线程都有自己的栈),那么多线程同时调用是没关系的,因为用的都是本线程的数据;但是如果函数用到一些全局数据,比如全局变量,根据堆内存首地址去访问的堆内存(形参传入的),同时操作一个数据结构(如对一个链表的操作),静态局部变量,那就存在数据安全问题,必须要加锁对函数访问加锁。

因此需要互斥处理的,一般是函数中有全局变量,有动态申请的空间,有静态局部变量,有需要进程数据循环发送之类的操作需要进行互斥处理。

线程安全函数和可重入函数

线程安全的(Thread-Safe):如果一个函数在同一时刻可以被多个线程安全地调用,就称该函数是线程安全的。线程安全函数解决多个线程调用函数时访问共享资 源的冲突问题。 可重入(Reentrant):函数可以由多于一个线程并发使用,而不必担心数据错误。可重入函数可以在任意时刻被中断,稍后再继续运行,不会丢失数据。可重入   性解决函数运行结果的确定性和可重复性。可重入函数编写规范为:

> - 不在函数内部使用静态或全局数据  > - 不返回静态或全局数据,所有数据都由函数的调用者提供。  > - 使用本地数据,或者通过制作全局数据的本地拷贝来保护全局数据。 > - 如果必须访问全局变量,利用互斥机制来保护全局变量。 > - 不调用不可重入函数。

两者之间的关系:

> - 一个函数对于多个线程是可重入的,则这个函数是线程安全的。 > - 一个函数是线程安全的,但并不一定是可重入的。 > - 可重入性要强于线程安全性。

python的函数传参

在讨论python的函数是如何传参之前,我们先讨论一下c/c++的函数传参方式, 对于c语言来说只有一种 call-by-value, 简单说就是实参会如实拷贝至形参,而c++会多了几种,下面请看:

c/c++函数传参

  1. 按值传参 按值传参的概念非常好理解,就是函数接收到了传递过来的参数后,将其拷贝一份,其函数内部执行的代码操作的都是传递参数的拷贝。也就是说,按值传参最大的特点就是不会影响到传递过来的参数的值,但因为拷贝了一份副本,会更浪费资源一些。
  2. 按(左值)引用传参 简单来说形参是实参的别名
  3. 按常量引用传参,比如下面的例子
string randomItem( const vector&lt;string&gt; & arr );
  1. 按右值引用传参 C++11 全面引入移动语义:
    x = y
    can be a copy if y is an lvalue,
    but a move if y is an rvalue.

调用拷贝构造函数主要有以下场景:

#include &lt;iostream&gt;
using namespace std;
class A
&#123;
public:
    A():m_ptr(new int(0))&#123;cout &lt;&lt; "construct" &lt;&lt; endl;&#125; 
    A(const A& a):m_ptr(new int(*a.m_ptr))&#123; cout &lt;&lt; "copy construct" &lt;&lt; endl; &#125;
    A(A&& a) :m_ptr(a.m_ptr) &#123;
        a.m_ptr = nullptr; 
        cout &lt;&lt; "move construct" &lt;&lt; endl;
    &#125;
    ~A()&#123; delete m_ptr;&#125;
private:
        int* m_ptr;
&#125;;
A GetA()&#123;
    return A();
&#125;
int main() &#123;
    A a = GetA();
    return 0; 
&#125;

输出结果:

construct
move construct
move construct

使用 -fno-elide-constructors 选项编译上述代码,可以关闭编译器的ROV优化,输出结果表明,并没有调用拷贝构造函数,只调用了move construct函数。

这就是移动语义,需要注意的一个细节是,我们提供移动构造函数的同时也会提供一个拷贝构造函数,以防止移动不成功的时候还能拷贝构造。更详细的内容参考这篇文章 - 从4行代码看右值引用

python的name binding

python中一切皆对象, a = 1 这条语句就是把名字a绑定在1这个对象上

a = [1,2,3]
def foo(b):
    print(b is a)
    b.append(4)
    print(b is a)
foo(a)    # 会打印出 True  True 
print(a)  #  [1,2,3,4]

def bar(c):
    print(c is a)
    c = [0,0,0]
    print(c is a)
bar(a)    # 会打印出 True  False
print(a)  # [1,2,3,4]

我们把python中的这种引用方式称作句柄引用, 和c++中的别名引用有一定区别

python的multiprocessing

multiprocessing模块提供了Process能让我们通过创建进程对象并执行该进程对象的start方法来创建一个真正的进程来执行任务,该接口类似threading模块中的线程类Thread.

但是当被操作对象数目不大的时候可以使用Process动态生成多个进程,但是如果需要的进程数一旦很多的时候,手动限制进程的数量以及处理不同进程返回值会变得异常的繁琐,multiprocessing模块提供了一个进程池Pool类,负责创建进程池对象,并提供了一些方法来讲运算任务offload到不同的子进程中执行,并很方便的获取返回值。multiprocessing.pool提供以下多种方式可以用来将任务分给到各个子进程:

Multi-argsConcurrenceBlockingOrdered-results
Pool.applyNoNoYesYes
Pool.apply_asyncNoYesNoNo
Pool.mapNoYesYesYes
Pool.map_asyncNoYesNoYes
Pool.starmapYesYesYesYes
Pool.stamap_asyncYesYesNoYes

注意:

> - Pool.imap and Pool.imap_async – lazier version of map and map_async. > - Pool.starmap 和Pool.map 相比可以接受多参数函数 > - Async methods submit all the processes at once and retrieve the results once they are finished. Use get method to obtain the results. > - Pool.map(or Pool.apply)methods are very much similar to Python built-in map(or apply). They block the main process until all the processes complete and return the result. > - 需要说明的是,最后一列Ordered-results表示各个方法返回的结果是否是有序的, 不是表示各个函数的返回值是否是按照调用的顺序返回的

chunksize影响优化效率

现在有一个需要加速的计算任务, xl是一个list of set, 我们要判断yl列表中的每一个元素是否存在set内, 两层for循环嵌套计算,具体代码如下:

import numpy as np
import time
import multiprocessing

if __name__ == '__main__':

    xl = [set(map(lambda x: str(x), np.random.randint(400, size=100))) for i in range(80000)]
    yl = list(map(lambda x: str(x), np.random.randint(400, size=400)))

    #单进程执行
    local_time = time.time()
    print("start pf time ", local_time)
    res = [ iy in ix for iy in yl for ix in xl]
    end_time = time.time()
    print("pf during time ", end_time - local_time)
    print(len(res), res[0:8])

输出:

multiprocessing.cpu_count() 4
start pf time  1585446005.5070348
pf during time  6.3575873374938965
32000000 [False, False, False, False, False, False, True, False]

我们使用pool.starmap进行并行加速

import numpy as np
import time
import multiprocessing
def func_task(x, yl): return [y in x for y in yl]
if __name__ == '__main__':

    print("multiprocessing.cpu_count()", multiprocessing.cpu_count())
    pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
    xl = [set(map(lambda x: str(x), np.random.randint(400, size=100))) for i in range(80000)]
    yl = list(map(lambda x: str(x), np.random.randint(400, size=400)))

    #使用starmap
    task = [(ix, yl) for ix in xl]
    local_time = time.time()
    print("start pf time ", local_time)
    results = pool.starmap(func_task, task)
    end_time = time.time()
    print("pf during time ", end_time - local_time)
    print(len(results), results[0][0:8])

输出:

multiprocessing.cpu_count() 4
start pf time  1585446134.5832891
pf during time  7.450953006744385
80000 [False, False, False, False, False, False, False, False]

居然变慢了,这是怎么回事? 带着这样的疑问查阅了相关资料 python-multiprocessing-understanding-logic-behind-chunksize

我的理解是合适的chunksize与具体的问题规模以及计算量相关, 不存在普适的公式,原文说:

> More chunks mean more overhead, but increased scheduling flexibility. How this answer will show, this leads to a higher worker-utilization on average, but without the guarantee of a shorter overall computation time for every case.

更过的chunks意味着更多的资源消耗,但是会提升scheduling flexibility,这再一定程度提升了平均cpu使用, 但是不能确保整体time消耗降低

据此我尝试了chunksize分别是10,100,1000,10000的情况,结果如下:

chunksizetime consumption (seconds)
105.82
1006.18
10005.61
100009.42

在chunksize=256时取得了4.8S的时间消耗

小结


Share this post on:

Previous Post
从多臂赌博机谈起
Next Post
大数据-DataFlow编程模型