三门峡市中国丧葬服务网

Python多进程与多线程适用场景案例分析

2026-05-07 16:19:02 浏览次数:0
详细信息

1. 核心差异概览

特性 多线程 多进程
内存 共享内存 独立内存空间
启动开销 较大
数据共享 容易(但有锁问题) 需要IPC机制
受GIL影响 是(CPU密集型受限)
适用场景 I/O密集型、GUI CPU密集型、需要隔离

2. 多线程适用场景案例

案例1:网络请求密集型

import threading
import requests
import time

def download_page(url, results, index):
    """下载单个网页"""
    try:
        response = requests.get(url, timeout=5)
        results[index] = len(response.text)
        print(f"Downloaded {url}: {len(response.text)} chars")
    except Exception as e:
        results[index] = 0
        print(f"Error downloading {url}: {e}")

def multithread_download(urls):
    """使用多线程下载多个网页"""
    threads = []
    results = [None] * len(urls)

    start = time.time()

    for i, url in enumerate(urls):
        thread = threading.Thread(
            target=download_page,
            args=(url, results, i)
        )
        threads.append(thread)
        thread.start()

    for thread in threads:
        thread.join()

    print(f"Total time: {time.time() - start:.2f}s")
    return results

# 测试
urls = [
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/2",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/3"
]
multithread_download(urls)

案例2:GUI应用(保持响应)

import tkinter as tk
import threading
import time

class GUIApp:
    def __init__(self):
        self.root = tk.Tk()
        self.root.title("多线程GUI示例")

        # 进度条
        self.progress = tk.DoubleVar()
        tk.Label(self.root, text="后台任务示例").pack()
        tk.Progressbar(self.root, variable=self.progress, length=200).pack()

        # 按钮
        tk.Button(self.root, text="启动耗时任务", 
                 command=self.start_background_task).pack()
        tk.Button(self.root, text="点击我(测试响应)",
                 command=self.show_response).pack()

    def long_running_task(self):
        """模拟耗时任务"""
        for i in range(1, 101):
            time.sleep(0.05)  # 模拟处理
            self.progress.set(i)
        print("任务完成!")

    def start_background_task(self):
        """在新线程中启动耗时任务"""
        thread = threading.Thread(target=self.long_running_task)
        thread.daemon = True  # 守护线程
        thread.start()

    def show_response(self):
        """测试GUI响应"""
        print("GUI仍然响应!")

    def run(self):
        self.root.mainloop()

# app = GUIApp()
# app.run()

3. 多进程适用场景案例

案例1:CPU密集型计算

import multiprocessing
import time
import math

def cpu_intensive_task(n):
    """计算n以内所有数的平方根之和"""
    result = 0
    for i in range(n):
        result += math.sqrt(i)
    return result

def multiprocess_calculation():
    """使用多进程进行并行计算"""
    start = time.time()

    # 创建进程池
    with multiprocessing.Pool(processes=4) as pool:
        # 分配任务
        tasks = [1000000, 1500000, 2000000, 2500000]
        results = pool.map(cpu_intensive_task, tasks)

    total_time = time.time() - start
    print(f"多进程计算结果: {results}")
    print(f"总耗时: {total_time:.2f}秒")
    return results

# 对比单进程版本
def single_process_calculation():
    start = time.time()
    results = []
    for n in [1000000, 1500000, 2000000, 2500000]:
        results.append(cpu_intensive_task(n))
    total_time = time.time() - start
    print(f"单进程计算结果: {results}")
    print(f"总耗时: {total_time:.2f}秒")
    return results

# 测试对比
if __name__ == "__main__":
    print("=== CPU密集型任务测试 ===")
    single_process_calculation()
    multiprocess_calculation()

案例2:数据处理的Pipeline

import multiprocessing
import os
from multiprocessing import Process, Queue

def stage1(raw_data_queue, processed_queue):
    """第一阶段:数据预处理"""
    while True:
        data = raw_data_queue.get()
        if data is None:  # 结束信号
            processed_queue.put(None)
            break

        # 模拟处理
        processed = [x * 2 for x in data]
        processed_queue.put(processed)
        print(f"Stage1 PID {os.getpid()}: processed {len(data)} items")

def stage2(processed_queue, result_queue):
    """第二阶段:数据分析"""
    while True:
        data = processed_queue.get()
        if data is None:
            result_queue.put(None)
            break

        # 模拟分析
        analysis_result = sum(data) / len(data)
        result_queue.put(analysis_result)
        print(f"Stage2 PID {os.getpid()}: analysis result {analysis_result:.2f}")

def pipeline_processing():
    """多进程流水线处理"""
    # 创建队列
    raw_queue = Queue()
    processed_queue = Queue()
    result_queue = Queue()

    # 创建进程
    p1 = Process(target=stage1, args=(raw_queue, processed_queue))
    p2 = Process(target=stage2, args=(processed_queue, result_queue))

    # 启动进程
    p1.start()
    p2.start()

    # 发送数据
    for i in range(10):
        raw_data = list(range(i * 100, (i + 1) * 100))
        raw_queue.put(raw_data)

    # 发送结束信号
    raw_queue.put(None)

    # 收集结果
    results = []
    while True:
        result = result_queue.get()
        if result is None:
            break
        results.append(result)

    # 等待进程结束
    p1.join()
    p2.join()

    print(f"处理完成,得到 {len(results)} 个结果")
    return results

4. 混合使用场景

案例:CPU密集型 + I/O密集型混合

import concurrent.futures
import time
import math
import requests

def io_bound_task(url):
    """I/O密集型任务"""
    response = requests.get(url, timeout=5)
    return len(response.text)

def cpu_bound_task(n):
    """CPU密集型任务"""
    return sum(math.sqrt(i) for i in range(n))

def hybrid_approach():
    """线程池处理I/O + 进程池处理CPU"""
    urls = [
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/delay/1"
    ]

    numbers = [100000, 200000, 300000, 400000]

    start = time.time()

    # 使用ThreadPoolExecutor处理I/O密集型任务
    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as io_executor:
        io_futures = [io_executor.submit(io_bound_task, url) for url in urls]

        # 使用ProcessPoolExecutor处理CPU密集型任务
        with concurrent.futures.ProcessPoolExecutor(max_workers=4) as cpu_executor:
            cpu_futures = [cpu_executor.submit(cpu_bound_task, n) for n in numbers]

            # 收集结果
            io_results = [f.result() for f in io_futures]
            cpu_results = [f.result() for f in cpu_futures]

    total_time = time.time() - start
    print(f"I/O结果: {io_results}")
    print(f"CPU结果: {cpu_results[:2]}...")  # 只显示前两个
    print(f"总耗时: {total_time:.2f}秒")
    return io_results, cpu_results

5. 选择指南总结

使用多线程的场景:

网络请求处理 - 爬虫、API调用 文件I/O操作 - 读写大量小文件 数据库操作 - 查询/写入数据库 GUI应用 - 保持界面响应 Web服务器 - 处理HTTP请求(如Flask开发服务器)

使用多进程的场景:

科学计算 - NumPy/Pandas数据处理 图像/视频处理 - OpenCV操作 机器学习 - 模型训练、特征工程 大规模数据处理 - 需要绕过GIL限制 需要隔离的任务 - 避免一个任务崩溃影响其他

实践建议:

优先使用concurrent.futures - 更高层次的抽象 小任务用线程池,大计算用进程池 I/O密集型:threading或asyncio CPU密集型:multiprocessing 数据共享少用进程,多用队列通信 考虑使用joblib、dask等高级库

代码示例:智能选择

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import functools

def smart_executor(task_type='io', max_workers=None):
    """根据任务类型智能选择执行器"""
    if task_type == 'cpu':
        return ProcessPoolExecutor(max_workers or multiprocessing.cpu_count())
    else:  # 'io' or default
        return ThreadPoolExecutor(max_workers or 10)

# 使用示例
def process_data_batch(data_batch, task_type='io'):
    """根据任务类型选择合适的并行方式"""
    with smart_executor(task_type) as executor:
        results = list(executor.map(process_function, data_batch))
    return results

关键点:理解GIL的影响是选择多进程还是多线程的关键。对于Python,I/O等待时GIL会释放,所以I/O密集型任务多线程效果很好;CPU密集型任务需要多进程来真正利用多核。

相关推荐