基于 Python concurrent库的 IO 密集型任务多线程解决方案

基于 Python concurrent库的 I/O 密集型任务多线程解决方案

Posted by G.h. Qi on July 15, 2024

前言

预计阅读时长: 2 min
原文链接:基于 Python concurrent.futures 库的 I/O 密集型任务多线程解决方案
原作者:Gh Qi ,如需引用转载,请注明来源。

1. 写在前面

1.1 多线程与多进程

在Python中,多线程和多进程的区别主要体现在以下几个方面:

(1). 全局解释器锁(GIL)

  • 多线程:由于Python的GIL,多线程在CPU密集型任务中无法真正实现并行。GIL限制了同一时间只有一个线程执行Python字节码。
  • 多进程:每个进程都有自己的Python解释器和GIL,因此可以实现真正的并行,适合CPU密集型任务。

(2). 模块支持

  • 多线程:使用threading模块来实现多线程编程。适合I/O密集型任务,如网络请求和文件操作。
  • 多进程:使用multiprocessing模块来实现多进程编程。适合需要并行处理的CPU密集型任务。

(3). 内存使用

  • 多线程:线程共享进程的内存空间,因此内存使用较少。
  • 多进程:每个进程有独立的内存空间,内存开销较大。

(4). 数据共享

  • 多线程:共享数据简单,但需要使用锁等同步机制来避免资源竞争。
  • 多进程:数据共享复杂,通常通过队列、管道或共享内存来实现。

(5). 稳定性

  • 多线程:一个线程崩溃可能影响整个进程。
  • 多进程:一个进程崩溃不会影响其他进程。

1.2 Python-concurrent库

在Python中,如果需要并行处理CPU密集型任务,推荐使用多进程。而对于I/O密集型任务,多线程可以更有效地提高性能。

在Python中,concurrent.futures库提供了一种高级接口来实现多线程和多进程编程。使用ThreadPoolExecutor可以方便地进行多线程编程。以下是一些关键点和示例:

concurrent.futures库简介

ThreadPoolExecutor:用于管理线程池,适合I/O密集型任务。 ProcessPoolExecutor:用于管理进程池,适合CPU密集型任务。

本文主要针对I/O密集型任务,介绍基于ThreadPoolExecutor的多线程解决方案。

2. 基于ThreadPoolExecutor的多线程解决方案

2.1 executor.map()函数简介

executor.mapconcurrent.futures 模块中 ThreadPoolExecutorProcessPoolExecutor 的一个便捷方法,它用于将一个函数应用于可迭代对象的每个元素上,并收集结果。下面是 executor.map 函数参数的官方定义:

executor.map(func, *iterables)
--func: 要应用于每个输入项的函数
--*iterables: 一个或多个可迭代对象(executor.map  func 应用于这些可迭代对象的元素)

用人话讲:executor.map(func,*iterables)的第一个参数是函数名,第二个参数中的 iterables 是一个列表,里面存放的是 func 函数的参数。 func 函数需要几个参数,列表 iterables 中就有几个元组,每个元组包含了所有线程所需的该参数的值。

例如:定义一个求两数之和函数:sum(a,b)。我想求1和2的和,3和4的和,5和6的和,这件事安排给3个线程来干,那么func=sumiterables=[(1, 3, 5), (2, 4, 6)]

data_to_process = [(1, 2), (3, 4), (5, 6)]
iterables = *zip(*data_to_process)  # 第二个参数

print(iterables)    # 打印第二个参数

输出为:

(1, 3, 5) (2, 4, 6)
executor.map(sum,iterables) # 多线程求和

iterables 里面给定参数组数可以低于max_workers,那么并不是所有的线程都会被分配工作。线程池中的线程会等待直到有任务可做,或者直到所有任务完成。当然也可以高于max_workers,那就只能排队干活了。

那么你可能就会问了,我的函数执行完之后还有个return呢,你这么搞,我返回的值去哪找呢?

executor.map 返回的是一个可迭代对象,它按顺序提供了 func 应用于每个输入项的结果。在 Python 3.7 之前,它返回一个列表;从 Python 3.7 开始,它返回一个 concurrent.futures._base._as_completed 对象,这个对象是可迭代的,但不是列表。

<generator object Executor.map.<locals>.result_iterator at 0x000001FB535D72E0>

要打印每个线程执行函数的输出结果,可以迭代 executor.map 返回的对象;也可以使用list()将其转化为一个列表。

# 方法1  

with ThreadPoolExecutor(max_workers=5) as executor:
    results = executor.map(process_data, *zip(*data_to_process))
print(results)

#返回一个 concurrent.futures._base._as_completed 对象,这个对象是可迭代的

for result in results:
    print(result)

输出:

<generator object Executor.map.<locals>.result_iterator at 0x000002A7435F3790>
3
7
11
# 方式2  

with ThreadPoolExecutor(max_workers=5) as executor:
    results = executor.map(process_data, *zip(*data_to_process))
    results = list(results)
print(results)

输出:

[3, 7, 11]

完整代码:

from concurrent.futures import ThreadPoolExecutor

# ---------------定义要执行的函数---------------

def process_data(arg1, arg2): 
    # 这里可以是任何需要两个参数的逻辑

    return arg1 + arg2

# ---------------创建数据列表------------------  

# 我想求1和2的和,3和4的和,5和6的和,参数如下:  

data_to_process = [(1, 2), (3, 4), (5, 6)]

# 使用 zip 将 data_to_process 中的元组解包为两个参数

iterables = *zip(*data_to_process)

# iterables=[ (1, 3, 5), (2, 4, 6)]

# -------创建线程池(最大线程数不超过5个)-------    

with ThreadPoolExecutor(max_workers=5) as executor: 
    results = executor.map(process_data,iterables)  # 多线程求和

    results = list(results) # 获取结果  
    
# --------------打印每个线程的输出结果-----------  

for result in results: 
    print(result)

2.2 示例

问题描述

我有很多地震数据,每个数据文件时长1小时,格式为.txt文件,每个月的数据存到一个文件夹中(例如‘202405’、‘202406’…)。现在我想把这些文件批量转化为.mseed格式,对于每个文件,涉及到的操作主要是:根据txt文件路径,读取txt文件中的数据;利用Obspy库创建Stream对象,定义头文件、数据流;存储该Stream对象为.mseed格式到本地。

编程实现

'''
程序功能:

  采用并行的方式,读取多个月的数据(每个月一个文件夹),将数据转换为mseed格式,并保存到指定文件夹中(每月一个文件夹,需提前创建好)。

Author: guoheng qi
Date: 2024/7/20
'''
from obspy import Stream,Trace
import numpy as np
import os
from concurrent.futures import ThreadPoolExecutor

def get_all_files(dir_path):
    """获取目录下所有文件的路径"""  
    file_paths = []
    for root, _, files in os.walk(dir_path):
        for file in files:
            file_paths.append(os.path.join(root, file))
    return file_paths

def read_file(file_path):
    """读取文件并打印文件名"""  
    print(f"Reading file: {file_path}")

    try:
        data = np.loadtxt(file_path)
        st = Stream()
        for i in range(16):
            tr = Trace(data=data[:,i])
            tr.stats.channel='{:02d}'.format(i+1) 
            #!!!!!!!!!!!!!!!!!!!!!  need to modify  !!!!!!!!!!!!!!!!!!!!!

            tr.stats.starttime=file_path[-22:-4]
            tr.stats.sampling_rate=100
            #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

            st +=tr
        st.write(r'..\mseed'+file_path[1:-4]+'.mseed',format='MSEED')
        
    except:
        print('error')

def main():

    # 定义6个文件夹路径

    dir_paths = [r'.\202402', r'.\202403',r'.\202401' ]

    # 获取所有文件路径 
    all_files = [] 
    for dir_path in dir_paths: 
        all_files.extend(get_all_files(dir_path))

    # 定义线程数量

    num_threads = 15
    
    # 使用 ThreadPoolExecutor 进行并行处理

    with ThreadPoolExecutor(max_workers=num_threads) as executor: 
        # 提交任务

        executor.map(read_file, all_files) 

if __name__ == "__main__":
    main()

更多开源干货,请关注我的GITHUB账号:https://github.com/chyiever。

小结

使用concurrent.futures库可以大大简化多线程编程,尤其适合需要并发处理的I/O密集型任务。