← 返回主页

第12课: 多线程与多进程

并发与并行

并发是指多个任务交替执行,并行是指多个任务同时执行。Python提供了多线程和多进程来实现并发编程。

多线程 (threading)

创建线程

import threading
import time

def worker(name):
    print(f"线程 {name} 开始")
    time.sleep(2)
    print(f"线程 {name} 结束")

# 创建线程
t1 = threading.Thread(target=worker, args=("A",))
t2 = threading.Thread(target=worker, args=("B",))

# 启动线程
t1.start()
t2.start()

# 等待线程完成
t1.join()
t2.join()

print("所有线程完成")

使用类创建线程

import threading

class WorkerThread(threading.Thread):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):
        print(f"{self.name} 开始工作")
        time.sleep(2)
        print(f"{self.name} 完成工作")

# 创建并启动线程
threads = []
for i in range(3):
    t = WorkerThread(f"Worker-{i}")
    t.start()
    threads.append(t)

# 等待所有线程完成
for t in threads:
    t.join()

线程同步 - Lock

import threading

counter = 0
lock = threading.Lock()

def increment():
    global counter
    for _ in range(100000):
        with lock:  # 使用锁保护共享资源
            counter += 1

threads = []
for _ in range(5):
    t = threading.Thread(target=increment)
    t.start()
    threads.append(t)

for t in threads:
    t.join()

print(f"计数器: {counter}")  # 500000

线程池 (ThreadPoolExecutor)

from concurrent.futures import ThreadPoolExecutor
import time

def task(n):
    print(f"处理任务 {n}")
    time.sleep(1)
    return n * n

# 创建线程池
with ThreadPoolExecutor(max_workers=3) as executor:
    # 提交任务
    futures = [executor.submit(task, i) for i in range(5)]

    # 获取结果
    for future in futures:
        result = future.result()
        print(f"结果: {result}")

# 使用map方法
with ThreadPoolExecutor(max_workers=3) as executor:
    results = executor.map(task, range(5))
    for result in results:
        print(f"结果: {result}")

多进程 (multiprocessing)

创建进程

import multiprocessing
import time

def worker(name):
    print(f"进程 {name} 开始,PID: {multiprocessing.current_process().pid}")
    time.sleep(2)
    print(f"进程 {name} 结束")

if __name__ == "__main__":
    # 创建进程
    p1 = multiprocessing.Process(target=worker, args=("A",))
    p2 = multiprocessing.Process(target=worker, args=("B",))

    # 启动进程
    p1.start()
    p2.start()

    # 等待进程完成
    p1.join()
    p2.join()

    print("所有进程完成")

进程池 (Pool)

from multiprocessing import Pool
import time

def square(n):
    time.sleep(1)
    return n * n

if __name__ == "__main__":
    # 创建进程池
    with Pool(processes=4) as pool:
        # 并行处理
        results = pool.map(square, range(10))
        print(results)

    # 使用apply_async
    with Pool(processes=4) as pool:
        results = [pool.apply_async(square, (i,)) for i in range(10)]
        output = [r.get() for r in results]
        print(output)

进程间通信 - Queue

from multiprocessing import Process, Queue

def producer(queue):
    for i in range(5):
        queue.put(f"数据-{i}")
        print(f"生产: 数据-{i}")
    queue.put(None)  # 结束信号

def consumer(queue):
    while True:
        item = queue.get()
        if item is None:
            break
        print(f"消费: {item}")

if __name__ == "__main__":
    queue = Queue()

    p1 = Process(target=producer, args=(queue,))
    p2 = Process(target=consumer, args=(queue,))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

进程间通信 - Pipe

from multiprocessing import Process, Pipe

def sender(conn):
    conn.send("Hello from sender")
    conn.close()

def receiver(conn):
    msg = conn.recv()
    print(f"收到消息: {msg}")
    conn.close()

if __name__ == "__main__":
    parent_conn, child_conn = Pipe()

    p1 = Process(target=sender, args=(child_conn,))
    p2 = Process(target=receiver, args=(parent_conn,))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

异步编程 (asyncio)

使用async/await实现协程,适合高并发I/O操作。

基本异步函数

import asyncio

async def say_hello(name):
    print(f"Hello, {name}")
    await asyncio.sleep(1)
    print(f"Goodbye, {name}")

# 运行异步函数
asyncio.run(say_hello("张三"))

并发执行多个协程

import asyncio

async def task(name, delay):
    print(f"任务 {name} 开始")
    await asyncio.sleep(delay)
    print(f"任务 {name} 完成")
    return f"结果-{name}"

async def main():
    # 并发执行
    results = await asyncio.gather(
        task("A", 2),
        task("B", 1),
        task("C", 3)
    )
    print(f"所有结果: {results}")

asyncio.run(main())

异步HTTP请求

import asyncio
import aiohttp

async def fetch(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()

async def main():
    urls = [
        "https://api.example.com/1",
        "https://api.example.com/2",
        "https://api.example.com/3"
    ]

    tasks = [fetch(url) for url in urls]
    results = await asyncio.gather(*tasks)

    for result in results:
        print(len(result))

asyncio.run(main())

性能对比示例

import time
import threading
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def cpu_bound_task(n):
    """CPU密集型任务"""
    return sum(i * i for i in range(n))

def io_bound_task(n):
    """I/O密集型任务"""
    time.sleep(0.1)
    return n

# 串行执行
start = time.time()
results = [cpu_bound_task(1000000) for _ in range(4)]
print(f"串行: {time.time() - start:.2f}秒")

# 多线程(不适合CPU密集型)
start = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(cpu_bound_task, [1000000] * 4))
print(f"多线程: {time.time() - start:.2f}秒")

# 多进程(适合CPU密集型)
if __name__ == "__main__":
    start = time.time()
    with ProcessPoolExecutor(max_workers=4) as executor:
        results = list(executor.map(cpu_bound_task, [1000000] * 4))
    print(f"多进程: {time.time() - start:.2f}秒")

线程安全的数据结构

import queue
import threading

# 线程安全的队列
q = queue.Queue()

def producer():
    for i in range(5):
        q.put(i)
        print(f"生产: {i}")

def consumer():
    while True:
        try:
            item = q.get(timeout=1)
            print(f"消费: {item}")
            q.task_done()
        except queue.Empty:
            break

threads = []
threads.append(threading.Thread(target=producer))
threads.append(threading.Thread(target=consumer))

for t in threads:
    t.start()

for t in threads:
    t.join()

最佳实践

练习

  1. 创建5个线程,每个线程打印1-10的数字
  2. 使用多进程计算1-1000000的所有数字平方和
  3. 实现一个生产者-消费者模型,使用Queue进行通信
  4. 使用asyncio并发下载多个网页
练习答案:
# 练习1:多线程打印
import threading

def print_numbers(thread_id):
    for i in range(1, 11):
        print(f"线程{thread_id}: {i}")

threads = []
for i in range(5):
    t = threading.Thread(target=print_numbers, args=(i,))
    t.start()
    threads.append(t)

for t in threads:
    t.join()

# 练习2:多进程计算
from multiprocessing import Pool

def square_sum(numbers):
    return sum(x * x for x in numbers)

if __name__ == "__main__":
    chunk_size = 250000
    chunks = [range(i, i + chunk_size) for i in range(0, 1000000, chunk_size)]

    with Pool(processes=4) as pool:
        results = pool.map(square_sum, chunks)

    total = sum(results)
    print(f"平方和: {total}")

# 练习3:生产者-消费者
from multiprocessing import Process, Queue
import time

def producer(q):
    for i in range(10):
        q.put(f"item-{i}")
        print(f"生产: item-{i}")
        time.sleep(0.1)
    q.put(None)

def consumer(q):
    while True:
        item = q.get()
        if item is None:
            break
        print(f"消费: {item}")
        time.sleep(0.2)

if __name__ == "__main__":
    queue = Queue()
    p1 = Process(target=producer, args=(queue,))
    p2 = Process(target=consumer, args=(queue,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()