[python] Python并行计算库Joblib使用指北

Joblib是用于高效并行计算的Python开源库,其提供了简单易用的内存映射和并行计算的工具,以将任务分发到多个工作进程中。Joblib库特别适合用于需要进行重复计算或大规模数据处理的任务。Joblib库的官方仓库见:
joblib
,官方文档见:
joblib-doc

Jolib库安装代码如下:

pip install joblib

# 查看版本
import joblib
joblib.__version__
'1.4.2'

1 使用说明

Joblib库主要功能涵盖以下三大块:

  • 记忆模式:Memory类将函数的返回值缓存到磁盘。下次调用时,如果输入参数不变,就直接从缓存中加载结果,避免重复计算。
  • 并行计算:Parallel类将任务拆分到多个进程或者线程中并行执行,加速计算过程。
  • 高效的序列化:针对NumPy数组等大型数据对象进行了优化,且序列化和反序列化速度快。

1.1 Memory类

Joblib库的Memory类支持通过记忆模式,将函数的计算结果存储起来,以便在下次使用时直接调用。这种机制的优势在于加速计算过程、节约资源以及简化管理。

Memory类构造函数如下:

class joblib.Memory(location=None, backend='local', mmap_mode=None, compress=False, verbose=1, bytes_limit=None, backend_options=None)

参数介绍如下:

  • location: 缓存文件的存放位置。如果设置为 None,则不缓存。
  • backend: 缓存的后端存储方式。默认是 “local”,表示使用本地文件系统。
  • mmap_mode: 一个字符串,表示内存映射文件的模式(None, ‘r+’, ‘r’, ‘w+’, ‘c’)。
  • compress: 表示是否压缩缓存文件。压缩可以节省磁盘空间,但会增加 I/O 操作的时间。
  • verbose: 一个整数,表示日志的详细程度。0 表示没有输出,1 表示只输出警告,2 表示输出信息,3 表示输出调试信息。
  • bytes_limit: 一个整数或 None,表示缓存使用的字节数限制。如果缓存超过了这个限制,最旧的缓存文件将被删除。
  • backend_options: 传递给缓存后端的选项。


Memory类简单使用

下面代码展示第一次调用函数并缓存结果:

from joblib import Memory
import os, shutil
# 创建一个Memory对象,指定缓存目录为当前目录下的run文件夹
# verbose=0表示关闭详细输出
cachedir = './run'
if os.path.exists(cachedir):
    shutil.rmtree(cachedir)
memory = Memory(cachedir, verbose=0)

# 使用@memory.cache装饰器,将函数f的结果缓存起来
@memory.cache
def f(x):
    # 只有当函数的输入参数x没有被缓存时,才会执行函数体内的代码
    print('Running f(%s)' % x)
    return x

# 第一次调用f(1),会执行函数体内的代码,并将结果缓存起来
print(f(1))
Running f(1)
1

第二次调用函数:

# 第二次调用f(1),由于结果已经被缓存,不会再次执行函数体内的代码,而是直接从缓存中读取结果
print(f(1))
1

调用其他函数:

# 调用f(2),由于输入参数不同,会再次执行函数体内的代码,并将结果缓存起来
print(f(2))
Running f(2)
2


将Memory类应用于numpy数组

import numpy as np
from joblib import Memory
import os, shutil
cachedir = './run'
if os.path.exists(cachedir):
    shutil.rmtree(cachedir)
memory = Memory(cachedir, verbose=0)

@memory.cache
def g(x):
    print('A long-running calculation, with parameter %s' % x)
    # 返回汉明窗
    return np.hamming(x)

@memory.cache
def h(x):
    print('A second long-running calculation, using g(x)')
    # 生成范德蒙德矩阵
    return np.vander(x)

# 调用函数g,传入参数3,并将结果存储在变量a中
a = g(3)
# 打印变量a的值
print(a)

# 再次调用函数g,传入相同的参数3,由于结果已被缓存,不会重新计算
print(g(3))
A long-running calculation, with parameter 3
[0.08 1.   0.08]
[0.08 1.   0.08]

直接计算和缓存结果是等同的:

# 调用函数h,传入变量a作为参数,并将结果存储在变量b中
b = h(a)
# 再次调用函数h,传入相同的参数a,由于结果已被缓存,不会重新计算
b2 = h(a)

# 使用numpy的allclose函数检查b和b2是否足够接近,即它们是否相等
print(np.allclose(b, b2))
A second long-running calculation, using g(x)
True


直接调用缓存结果

import numpy as np
from joblib import Memory

import os, shutil

# 设置缓存目录的路径。
cachedir = './run'

# 检查缓存目录是否存在。
if os.path.exists(cachedir):
    # 如果缓存目录存在,使用shutil.rmtree删除该目录及其内容。
    shutil.rmtree(cachedir)

# 初始化Memory对象,设置缓存目录为上面定义的cachedir,mmap_mode设置为'r',表示只读模式。
memory = Memory(cachedir, mmap_mode='r', verbose=0)

# 使用memory.cache装饰器缓存np.square函数的结果。
square = memory.cache(np.square)

a = np.vander(np.arange(3)).astype(float)

# 打印通过square函数处理后的矩阵a。
print(square(a))

# 获取a的缓存结果
result = square.call_and_shelve(a)
print(result.get())  # 获取并打印缓存的结果。
[[ 0.  0.  1.]
 [ 1.  1.  1.]
 [16.  4.  1.]]
[[ 0.  0.  1.]
 [ 1.  1.  1.]
 [16.  4.  1.]]


类中使用缓存

Memory类不建议将其直接用于类方法。如果想在类中使用缓存,建议的模式是在类中使用单独定义的缓存函数,如下所示:

@memory.cache
def compute_func(arg1, arg2, arg3):
    pass

class Foo(object):
    def __init__(self, args):
        self.data = None

    def compute(self):
        # 类中调用缓存的函数
        self.data = compute_func(self.arg1, self.arg2, 40)

1.2 Parallel类

Joblib库的Parallel类用于简单快速将任务分解为多个子任务,并分配到不同的CPU核心或机器上执行,从而显著提高程序的运行效率。

Parallel类构造函数及主要参数如下:

class joblib.Parallel(n_jobs=default(None), backend=default(None), return_as='list', verbose=default(0), timeout=None, batch_size='auto', pre_dispatch='2 * n_jobs', temp_folder=default(None), max_nbytes=default('1M'), require=default(None))

参数介绍如下:

  • n_jobs: 指定并行任务的数量,为-1时表示使用所有可用的CPU核心;为None时表示使用单个进程。
  • backend:指定并行化的后端,可选项:

    • ‘loky’:使用
      loky
      库实现多进程,该库由joblib开发者开发,默认选项。
    • ‘threading’:使用threading库实现多线程。
    • ‘multiprocessing’:使用multiprocessing库实现多进程。
  • return_as:返回结果格式,可选项:

    • ‘list:列表。
    • generator:按照任务提交顺序生成结果的生成器。
    • generator_unordered:按照执行结果完成先后顺序的生成器。
  • verbose: 一个整数,表示日志的详细程度。0 表示没有输出,1 表示只输出警告,2 表示输出信息,3 表示输出调试信息。
  • timeout:单个任务最大运行时长,超时将引发TimeOutError。仅适用于n_jobs不为1的情况。
  • batch_size:当Parallel类执行任务时,会将任务分批处理。batch_size参数决定了每个批次中包含的任务数。
  • pre_dispatch: 用来决定在并行计算开始之前,每个批次有多少个任务会被预先准备好并等待被分配给单个工作进程。默认值为“2*n_jobs”,表示并行计算时可以使用2倍工作进程的任务数量。
  • temp_folder:指定临时文件的存储路径。
  • max_nbytes:传递给工作程序的数组大小的阈值。
  • require:对运行任务的要求,可选None和sharedmem。sharedmem表示将使用共享内存来执行并行任务,但会影响计算性能。


简单示例

以下代码展示了单线程直接运行计算密集型任务结果:

from joblib import Parallel, delayed
import numpy as np
import time

start = time.time()

# 定义一个计算密集型函数
def compute_heavy_task(data):
    # 模拟处理时间
    time.sleep(1)
    # 数值计算
    result = np.sum(np.square(data))
    return result

# 生成一些模拟数据
# 设置随机数生成器的种子
np.random.seed(42) 
data = np.random.rand(10, 1000)  # 10个1000维的向量
results = [compute_heavy_task(d) for d in data]

# 打印结果的和
print(f"结果: {sum(results)}")
print(f"耗时:{time.time()-start}s")
结果: 3269.16485027708
耗时:10.101513624191284s

以下代码展示利用Parallel类创建多进程运行计算密集型任务结果:

from joblib import Parallel, delayed
import numpy as np
import time

start = time.time()

# 定义一个计算密集型函数
def compute_heavy_task(data):
    # 模拟处理时间
    time.sleep(1)
    # 数值计算
    result = np.sum(np.square(data))
    return result
    
# 设置随机数生成器的种子
np.random.seed(42) 
# 生成一些模拟数据
data = np.random.rand(10, 1000)  # 10个1000维的向量

# 使用Parallel来并行执行任务
results = Parallel(n_jobs=8, return_as="generator")(delayed(compute_heavy_task)(d) for d in data)

# 打印结果的和
print(f"结果: {sum(results)}")
print(f"耗时:{time.time()-start}s")
结果: 3269.16485027708
耗时:2.381772041320801s

可以看到joblib库利用多进程技术显著提高了任务执行的效率。然而,当面对I/O密集型任务或执行时间极短的任务时,多线程或多进程的优势可能并不明显。这是因为线程创建和上下文切换的开销有时可能超过任务本身的执行时间。以上述的compute_heavy_task函数为例,如果移除了其中的time.sleep函数,多进程执行所需的时间将会显著增加。

此外获取当前系统的cpu核心数(逻辑处理器)代码如下:

import joblib

# 获取当前系统的cpu核心数
n_cores = joblib.cpu_count()

print(f'系统的核心数是:{n_cores}')
系统的核心数是:16


不同并行方式对比

以下代码展示了不同并行方式在Parallel类中的应用。默认使用loky多进程:

# 使用loky多进程
from joblib import Parallel, delayed
import numpy as np
import time

start = time.time()

# 定义一个计算密集型函数
def compute_heavy_task(data):
    # 模拟处理时间
    time.sleep(1)
    # 数值计算
    result = np.sum(np.square(data))
    return result

# 生成一些模拟数据
data = np.random.rand(10, 1000)  # 10个1000维的向量
results = Parallel(n_jobs=8, return_as="generator", backend='loky')(delayed(compute_heavy_task)(d) for d in data)

# 打印结果的和
print(f"结果: {sum(results)}")
print(f"耗时:{time.time()-start}s")
结果: 3382.3336437893217
耗时:2.042675256729126s

以下代码展示了threading多线程的使用,注意由于Python的全局解释器锁(GIL)确保在任何时刻只有一个线程执行Python字节码。这表明即使在多核处理器上,Python的线程也无法实现真正的并行计算。然而,当涉及到处理I/O密集型任务或需要快速响应的小规模任务时,多线程依然具有优势:

# 使用threading多线程
start = time.time()
results = Parallel(n_jobs=8, return_as="generator", backend = 'threading')(delayed(compute_heavy_task)(d) for d in data)

# 打印结果的和
print(f"结果: {sum(results)}")
print(f"耗时:{time.time()-start}s")
结果: 3382.3336437893217
耗时:2.040527105331421s

以下代码展示了multiprocessing多进程的使用,注意Windows下需要将multiprocessing相关代码放在main函数中:

from joblib import Parallel, delayed
import numpy as np
import time

# 定义一个计算密集型函数
def compute_heavy_task(data):
    # 模拟处理时间
    time.sleep(1)
    # 数值计算
    result = np.sum(np.square(data))
    return result

def main():
    start = time.time()

    # 生成一些模拟数据
    data = np.random.rand(10, 1000)  # 10个1000维的向量
    # multiprocessing不支持返回rgenerator
    results = Parallel(n_jobs=8, return_as="list",  backend='multiprocessing')(delayed(compute_heavy_task)(d) for d in data)

    # 打印结果的和
    print(f"结果: {sum(results)}")
    print(f"耗时:{time.time()-start}s")
    
if __name__ == '__main__':
    main()
结果: 3304.6651996375645
耗时:2.4303956031799316s

以下是

loky



threading



multiprocessing

的一些关键特性对比:

特性/库
loky

threading

multiprocessing
适用平台 跨平台 跨平台 跨平台,但Windows上存在限制
进程/线程模型 进程 线程 进程
GIL影响
适用场景 CPU密集型任务 I/O密集型任务 CPU密集型任务
启动开销 较小 较小 较大
内存使用 较高 较低 较高
进程间通信 通过管道、队列等 通过共享数据结构 通过管道、队列等
线程间通信 共享数据结构 共享数据结构 不适用
异常处理 进程间独立 线程间共享 进程间独立
调试难度 较高 较低 较高
适用框架 通用 通用 通用

Python中线程和进程简单对比如下:

  • 资源共享:线程共享同一进程的内存和资源,而进程拥有独立的内存空间。
  • GIL影响:线程受GIL限制,进程不受GIL限制。
  • 开销:线程的创建和切换开销小,进程的创建和切换开销大。
  • 适用性:线程适合I/O密集型任务,进程适合CPU密集型任务。
  • 通信:线程间通信简单但需要处理同步问题,进程间通信复杂但天然隔离。

在实际应用中,选择使用线程还是进程取决于任务的特性和性能需求。如果任务主要是I/O密集型,使用线程可以提高性能;如果任务是CPU密集型,使用进程可以更好地利用多核处理器的计算能力。


共享内存

默认情况下,Parallel类执行任务时各个任务不共享内存,如下所示:

from joblib import Parallel, delayed
shared_set = set()
def collect(x):
   shared_set.add(x)
Parallel(n_jobs=2)(delayed(collect)(i) for i in range(5))
print(sorted(shared_set))
[]

通过设置require=’sharedmem’可以实现内存共享:

# require='sharedmem'表示需要共享内存,以确保多个进程可以访问shared_set集合
Parallel(n_jobs=2, require='sharedmem')(delayed(collect)(i) for i in range(5))
print(sorted(shared_set))
[0, 1, 2, 3, 4]


上下文管理器

一些算法需要对一个并行函数进行多次连续调用,但在循环中多次调用joblib.Parallel是次优的,因为这将多次创建和销毁一组工作进程,从而导致显著的性能开销。

对于这种情况,使用joblib.Parallel类的上下文管理器API更为高效,可以重用同一组工作进程进行多次调用joblib.Parallel对象。如下所示:

from joblib import Parallel, delayed
import math
with Parallel(n_jobs=2) as parallel:
   accumulator = 0.
   n_iter = 0
   while accumulator < 1000:
       results = parallel(delayed(math.sqrt)(accumulator + i ** 2) for i in range(5))
       accumulator += sum(results)
       n_iter += 1
print(accumulator, n_iter)  
1136.5969161564717 14


parallel_config

Joblib提供parallel_config类用于配置并行执行的参数,比如并行的后端类型、批处理大小等,这些配置可以影响后续所有的parallel实例。它通常在调用Parallel类之前使用。关于parallel_config使用见:
parallel_config

1.3 序列化

joblib.dump()和joblib.load()提供了一种替代pickle库的方法,可以高效地序列化处理包含大量数据的任意Python对象,特别是大型的NumPy数组。关于pickle库使用见:
Python数据序列化模块pickle使用笔记
。两者效果对比见:

特点 pickle joblib
性能 一般 针对NumPy数组等大数据类型有优化,通常更快
并行处理 不支持 内置并行处理功能,可以加速任务
内存映射 不支持 支持内存映射,可以高效处理大文件
压缩 支持 支持压缩,可以减少存储空间
附加功能 提供了一些额外的功能,如缓存、延迟加载等

以下代码展示了joblib.dump的基本使用:

from tempfile import mkdtemp

# 使用mkdtemp创建一个临时目录,并将目录路径存储在变量savedir中。
savedir = mkdtemp(dir='./')

import os
# 文件保存路径
filename = os.path.join(savedir, 'test.joblib')

import numpy as np
import pandas as pd
import joblib

# 创建一个要持久化的字典
to_persist = [('a', [1, 2, 3]), ('b', np.arange(10)), ('c', pd.DataFrame(np.ones((5,5))))]

# 使用joblib.dump函数将to_persist字典序列化并保存到filename指定的文件中
# 注意pickle库无法序列化numpy数据
joblib.dump(to_persist, filename)
['./tmp82ms1z5w\\test.joblib']

使用joblib.load函数从指定的文件中加载之前保存的序列化数据:

joblib.load(filename)
[('a', [1, 2, 3]),
 ('b', array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])),
 ('c',
       0    1    2    3    4
  0  1.0  1.0  1.0  1.0  1.0
  1  1.0  1.0  1.0  1.0  1.0
  2  1.0  1.0  1.0  1.0  1.0
  3  1.0  1.0  1.0  1.0  1.0
  4  1.0  1.0  1.0  1.0  1.0)]

joblib.dump和joblib.load函数还接受文件对象:

with open(filename, 'wb') as fo:  
    # 使用joblib将对象to_persist序列化并写入文件
   joblib.dump(to_persist, fo)
with open(filename, 'rb') as fo:  
   joblib.load(fo)

此外joblib.dump也支持设置compress参数以实现数据压缩:

# compress参数为压缩级别,取值为0到9,值越大压缩效果越好。为0时表示不压缩,默认值为0
joblib.dump(to_persist, filename, compress=1)
['./tmp82ms1z5w\\test.joblib']

默认情况下,joblib.dump使用zlib压缩方法,因为它在速度和磁盘空间之间实现了最佳平衡。其他支持的压缩方法包括“gzip”、“bz2”、“lzma”和“xz”。compress参数输入带有压缩方法和压缩级别就可以选择不同压缩方法:

joblib.dump(to_persist, filename + '.gz', compress=('gzip', 3))  
joblib.load(filename + '.gz')
joblib.dump(to_persist, filename + '.bz2', compress=('bz2', 5))  
joblib.load(filename + '.bz2')
[('a', [1, 2, 3]),
 ('b', array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])),
 ('c',
       0    1    2    3    4
  0  1.0  1.0  1.0  1.0  1.0
  1  1.0  1.0  1.0  1.0  1.0
  2  1.0  1.0  1.0  1.0  1.0
  3  1.0  1.0  1.0  1.0  1.0
  4  1.0  1.0  1.0  1.0  1.0)]

除了默认压缩方法,lz4压缩算法也可以用于数据压缩。前提是需要安装lz4压缩库:

pip install lz4

在这些压缩方法中,lz4和默认方法效果较好。lz4使用方式与其他压缩方式一样:

joblib.dump(to_persist, filename, compress=('lz4', 3))  
['./tmp82ms1z5w\\test.joblib']

2 实例

2.1 joblib缓存和并行

本实例展示了利用joblib缓存和并行来加速任务执行。以下代码展示了一个高耗时任务:

# 导入time模块,用于实现延时功能
import time

# 定义一个模拟耗时计算的函数
def costly_compute(data, column):
    # 休眠1秒,模拟耗时操作
    time.sleep(1)
    # 返回传入数据的指定列
    return data[column]

# 定义一个计算数据列平均值的函数
def data_processing_mean(data, column):
    # 调用costly_compute函数获取指定列的数据
    column_data = costly_compute(data, column)
    # 计算并返回该列数据的平均值
    return column_data.mean()

# 导入numpy库,并设置随机数生成器的种子,以保证结果的可复现性
import numpy as np
rng = np.random.RandomState(42)
# 生成1000行4列的随机数据矩阵
data = rng.randn(int(1000), 4)

# 记录开始时间
start = time.time()
# 对数据的每一列计算平均值,并将结果存储在results列表中
results = [data_processing_mean(data, col) for col in range(data.shape[1])]
# 记录结束时间
stop = time.time()

# 打印处理过程的描述信息
print('\nSequential processing')
# 打印整个处理过程的耗时
print('Elapsed time for the entire processing: {:.2f} s'.format(stop - start))
Sequential processing
Elapsed time for the entire processing: 4.05 s

下段代码演示了如何使用joblib库来缓存和并行化计算上述任务:

# 导入time模块,用于模拟耗时操作。
import time

# 定义一个使用缓存的函数,用于计算数据的均值。
def data_processing_mean_using_cache(data, column):
    return costly_compute_cached(data, column).mean()

# 从joblib库导入Memory类,用于缓存函数的输出。
from joblib import Memory

# 设置缓存的存储位置和详细程度
location = './cachedir'
memory = Memory(location, verbose=0)

# 使用Memory对象的cache方法来缓存costly_compute函数的输出。
costly_compute_cached = memory.cache(costly_compute)

# 从joblib库导入Parallel和delayed类,用于并行执行函数。
from joblib import Parallel, delayed

# 记录开始时间。
start = time.time()

# 使用Parallel类并行执行data_processing_mean_using_cache函数,对数据的每一列进行处理。
results = Parallel(n_jobs=2)(
    delayed(data_processing_mean_using_cache)(data, col)  
    for col in range(data.shape[1])) 

# 记录结束时间。
stop = time.time()

# 打印第一轮处理的耗时信息,包括缓存数据的时间。
print('\nFirst round - caching the data')
print('Elapsed time for the entire processing: {:.2f} s'.format(stop - start))
First round - caching the data
Elapsed time for the entire processing: 2.05 s

再次执行相同的过程,可以看到结果被缓存而不是重新执行函数:

start = time.time()
results = Parallel(n_jobs=2)(
    delayed(data_processing_mean_using_cache)(data, col)
    for col in range(data.shape[1]))
stop = time.time()

print('\nSecond round - reloading from the cache')
print('Elapsed time for the entire processing: {:.2f} s'.format(stop - start))

# 如果不想使用缓存结果,可以清除缓存信息
memory.clear(warn=False)
Second round - reloading from the cache
Elapsed time for the entire processing: 0.02 s

2.2 序列化

以下示例展示了在joblib.Parallel中使用序列化内存映射(numpy.memmap)。内存映射可以将大型数据集分割成小块,并在需要时将其加载到内存中。这种方法可以减少内存使用,并提高处理速度。

定义耗时函数:

import numpy as np

data = np.random.random((int(1e7),))
window_size = int(5e5)
slices = [slice(start, start + window_size)
          for start in range(0, data.size - window_size, int(1e5))]

import time

def slow_mean(data, sl):
    time.sleep(0.01)
    return data[sl].mean()

以下代码是直接调用函数的运行结果:

tic = time.time()
results = [slow_mean(data, sl) for sl in slices]
toc = time.time()
print('\nElapsed time computing the average of couple of slices {:.2f} s'.format(toc - tic))
Elapsed time computing the average of couple of slices 1.49 s

以下代码是调用Parallel类2个进程运行的结果,由于整体任务计算耗时较少。所以Parallel类并行计算并没有比直接调用函数有太多速度优势,因为进程启动销毁需要额外时间:

from joblib import Parallel, delayed

tic = time.time()
results = Parallel(n_jobs=2)(delayed(slow_mean)(data, sl) for sl in slices)
toc = time.time()
print('\nElapsed time computing the average of couple of slices {:.2f} s'.format(toc - tic))
Elapsed time computing the average of couple of slices 1.00 s

以下代码提供了joblib.dump和load函数加速数据读取。其中dump函数用于将data对象序列化并保存到磁盘上的文件中,同时创建了一个内存映射,使得该文件可以像内存数组一样被访问。当程序再次加载这个文件时,可以使用load函数以内存映射模式打开:

import os 
from joblib import dump, load  # 从joblib库导入dump和load函数,用于创建和加载内存映射文件

# 设置内存映射文件的文件夹路径
folder = './memmap'
os.makedirs(folder, exist_ok = True)

# 将内存映射文件的名称与路径结合
data_filename_memmap = os.path.join(folder, 'data_memmap.joblib')

# 使用dump函数将数据对象'data'保存到内存映射文件
dump(data, data_filename_memmap)

# 使用load函数加载内存映射文件,mmap_mode='r'表示以只读模式打开
data_ = load(data_filename_memmap, mmap_mode='r')

# 记录开始时间
tic = time.time()
results = Parallel(n_jobs=2)(delayed(slow_mean)(data_, sl) for sl in slices)

# 记录结束时间
toc = time.time()
print('\nElapsed time computing the average of couple of slices {:.2f} s\n'.format(toc - tic))  

import shutil
# 结束时删除映射文件
try:
    shutil.rmtree(folder)
except: 
    pass
Elapsed time computing the average of couple of slices 0.77 s

2.3 内存监视

本实例展示不同并行方式的内存消耗情况。


创建内存监视器

from psutil import Process
from threading import Thread

class MemoryMonitor(Thread):
    """在单独的线程中监控内存使用情况(以MB为单位)。"""
    def __init__(self):
        super().__init__()  # 调用父类Thread的构造函数
        self.stop = False  # 用于控制线程停止的标记
        self.memory_buffer = []  # 用于存储内存使用记录的列表
        self.start()  # 启动线程

    def get_memory(self):
        """获取进程及其子进程的内存使用情况。"""
        p = Process()  # 获取当前进程
        memory = p.memory_info().rss  # 获取当前进程的内存使用量
        for c in p.children():  # 遍历所有子进程
            memory += c.memory_info().rss  # 累加子进程的内存使用量
        return memory

    def run(self):
        """线程运行的主体方法,周期性地记录内存使用情况。"""
        memory_start = self.get_memory()  # 获取初始内存使用量
        while not self.stop:  # 当未设置停止标记时循环
            self.memory_buffer.append(self.get_memory() - memory_start)  # 记录当前内存使用量与初始内存使用量的差值
            time.sleep(0.2)  # 休眠0.2秒

    def join(self):
        """重写join方法,设置停止标记并等待线程结束。"""
        self.stop = True  # 设置停止标记
        super().join()  # 调用父类方法等待线程结束


并行任务

结果返回list的并行任务:

import time
import numpy as np

def return_big_object(i):
    """生成并返回一个大型NumPy数组对象。"""
    time.sleep(.1)  # 休眠0.1秒模拟耗时操作
    return i * np.ones((10000, 200), dtype=np.float64) 

def accumulator_sum(generator):
    """累加生成器生成的所有值,并打印进度。"""
    result = 0
    for value in generator:
        result += value
        # print(".", end="", flush=True)  # 打印点号并刷新输出
    # print("")  # 打印换行符
    return result

from joblib import Parallel, delayed

monitor = MemoryMonitor()  # 创建内存监控器实例,并启动监视
print("Running tasks with return_as='list'...")  # 打印启动任务信息
res = Parallel(n_jobs=2, return_as="list")(
    delayed(return_big_object)(i) for i in range(150)  # 使用joblib的Parallel功能并行执行任务
)
res = accumulator_sum(res)  # 累加结果
print('All tasks completed and reduced successfully.')  # 打印任务完成信息

# 报告内存使用情况
del res  # 清理结果以避免内存边界效应
monitor.join()  # 等待内存监控线程结束
peak = max(monitor.memory_buffer) / 1e9  # 计算峰值内存使用量,并转换为GB
print(f"Peak memory usage: {peak:.2f}GB")  # 打印峰值内存使用量
Running tasks with return_as='list'...
All tasks completed and reduced successfully.
Peak memory usage: 2.44GB

如果改为输出生成器,那么内存使用量将会大大减少:

monitor_gen = MemoryMonitor()  # 创建内存监控器实例,并启动监视
print("Running tasks with return_as='generator'...")  # 打印启动任务信息
res = Parallel(n_jobs=2, return_as="generator")(
    delayed(return_big_object)(i) for i in range(150)  
)
res = accumulator_sum(res)  # 累加结果
print('All tasks completed and reduced successfully.')  # 打印任务完成信息

# 报告内存使用情况
del res  # 清理结果以避免内存边界效应
monitor_gen.join()  # 等待内存监控线程结束
peak = max(monitor_gen.memory_buffer) / 1e9  # 计算峰值内存使用量,并转换为GB
print(f"Peak memory usage: {peak:.2f}GB")  # 打印峰值内存使用量
Running tasks with return_as='generator'...
All tasks completed and reduced successfully.
Peak memory usage: 0.19GB

下图展示了以上两种方法的内存消耗情况,第一种情况涉及到将所有结果存储在内存中,直到处理完成,这可能导致内存使用量随着时间线性增长。而第二种情况generator则涉及到流式处理,即结果被实时处理,因此不需要同时在内存中存储所有结果,从而减少了内存使用的需求:

import matplotlib.pyplot as plt
plt.figure(0)
plt.semilogy(
    np.maximum.accumulate(monitor.memory_buffer),
    label='return_as="list"'
)
plt.semilogy(
    np.maximum.accumulate(monitor_gen.memory_buffer),
    label='return_as="generator"'
)
plt.xlabel("Time")
plt.xticks([], [])
plt.ylabel("Memory usage")
plt.yticks([1e7, 1e8, 1e9], ['10MB', '100MB', '1GB'])
plt.legend()
plt.show()


进一步节省内存

前一个例子中的生成器是保持任务提交的顺序的。如果某些进程任务提交晚,但比其他任务更早完成。相应的结果会保持在内存中,以等待其他任务完成。如果任务对结果返回顺序无要求,例如最后只是对所有结果求和,可以使用generator_unordered减少内存消耗。如下所示:

# 创建一个每个任务耗时可能不同的处理函数
def return_big_object_delayed(i):
    if (i + 20) % 60:
        time.sleep(0.1)
    else:
        time.sleep(5)
    return i * np.ones((10000, 200), dtype=np.float64)

返回为generator格式的内存使用:

monitor_delayed_gen = MemoryMonitor()
print("Create result generator on delayed tasks with return_as='generator'...")
res = Parallel(n_jobs=2, return_as="generator")(
    delayed(return_big_object_delayed)(i) for i in range(150)
)
res = accumulator_sum(res)
print('All tasks completed and reduced successfully.')

del res 
monitor_delayed_gen.join()
peak = max(monitor_delayed_gen.memory_buffer) / 1e6
print(f"Peak memory usage: {peak:.2f}MB")
Create result generator on delayed tasks with return_as='generator'...
All tasks completed and reduced successfully.
Peak memory usage: 784.23MB

返回为generator_unordered格式的内存使用:

monitor_delayed_gen_unordered = MemoryMonitor()
print(
  "Create result generator on delayed tasks with "
  "return_as='generator_unordered'..."
)
res = Parallel(n_jobs=2, return_as="generator_unordered")(
    delayed(return_big_object_delayed)(i) for i in range(150)
)
res = accumulator_sum(res)
print('All tasks completed and reduced successfully.')

del res 
monitor_delayed_gen_unordered.join()
peak = max(monitor_delayed_gen_unordered.memory_buffer) / 1e6
print(f"Peak memory usage: {peak:.2f}MB")
Create result generator on delayed tasks with return_as='generator_unordered'...
All tasks completed and reduced successfully.
Peak memory usage: 175.22MB

内存使用结果对比如下。基于generator_unordered选项在执行任务时,能够独立地处理每个任务,而不需要依赖于其他任务的完成状态。但是要注意的是由于系统负载、后端实现等多种可能影响任务执行顺序的因素,结果的返回顺序是不确定的:

plt.figure(1)
plt.semilogy(
    np.maximum.accumulate(monitor_delayed_gen.memory_buffer),
    label='return_as="generator"'
)
plt.semilogy(
    np.maximum.accumulate(monitor_delayed_gen_unordered.memory_buffer),
    label='return_as="generator_unordered"'
)
plt.xlabel("Time")
plt.xticks([], [])
plt.ylabel("Memory usage")
plt.yticks([1e7, 1e8, 1e9], ['10MB', '100MB', '1GB'])
plt.legend()
plt.show()

3 参考

  • joblib
  • joblib-doc
  • loky
  • parallel_config
  • Python数据序列化模块pickle使用笔记

未经允许不得转载:大白鲨游戏网 » [python] Python并行计算库Joblib使用指北