并发是指多个任务交替执行,并行是指多个任务同时执行。Python提供了多线程和多进程来实现并发编程。
import threading
import time
def worker(name):
print(f"线程 {name} 开始")
time.sleep(2)
print(f"线程 {name} 结束")
# 创建线程
t1 = threading.Thread(target=worker, args=("A",))
t2 = threading.Thread(target=worker, args=("B",))
# 启动线程
t1.start()
t2.start()
# 等待线程完成
t1.join()
t2.join()
print("所有线程完成")
import threading
class WorkerThread(threading.Thread):
def __init__(self, name):
super().__init__()
self.name = name
def run(self):
print(f"{self.name} 开始工作")
time.sleep(2)
print(f"{self.name} 完成工作")
# 创建并启动线程
threads = []
for i in range(3):
t = WorkerThread(f"Worker-{i}")
t.start()
threads.append(t)
# 等待所有线程完成
for t in threads:
t.join()
import threading
counter = 0
lock = threading.Lock()
def increment():
global counter
for _ in range(100000):
with lock: # 使用锁保护共享资源
counter += 1
threads = []
for _ in range(5):
t = threading.Thread(target=increment)
t.start()
threads.append(t)
for t in threads:
t.join()
print(f"计数器: {counter}") # 500000
from concurrent.futures import ThreadPoolExecutor
import time
def task(n):
print(f"处理任务 {n}")
time.sleep(1)
return n * n
# 创建线程池
with ThreadPoolExecutor(max_workers=3) as executor:
# 提交任务
futures = [executor.submit(task, i) for i in range(5)]
# 获取结果
for future in futures:
result = future.result()
print(f"结果: {result}")
# 使用map方法
with ThreadPoolExecutor(max_workers=3) as executor:
results = executor.map(task, range(5))
for result in results:
print(f"结果: {result}")
import multiprocessing
import time
def worker(name):
print(f"进程 {name} 开始,PID: {multiprocessing.current_process().pid}")
time.sleep(2)
print(f"进程 {name} 结束")
if __name__ == "__main__":
# 创建进程
p1 = multiprocessing.Process(target=worker, args=("A",))
p2 = multiprocessing.Process(target=worker, args=("B",))
# 启动进程
p1.start()
p2.start()
# 等待进程完成
p1.join()
p2.join()
print("所有进程完成")
from multiprocessing import Pool
import time
def square(n):
time.sleep(1)
return n * n
if __name__ == "__main__":
# 创建进程池
with Pool(processes=4) as pool:
# 并行处理
results = pool.map(square, range(10))
print(results)
# 使用apply_async
with Pool(processes=4) as pool:
results = [pool.apply_async(square, (i,)) for i in range(10)]
output = [r.get() for r in results]
print(output)
from multiprocessing import Process, Queue
def producer(queue):
for i in range(5):
queue.put(f"数据-{i}")
print(f"生产: 数据-{i}")
queue.put(None) # 结束信号
def consumer(queue):
while True:
item = queue.get()
if item is None:
break
print(f"消费: {item}")
if __name__ == "__main__":
queue = Queue()
p1 = Process(target=producer, args=(queue,))
p2 = Process(target=consumer, args=(queue,))
p1.start()
p2.start()
p1.join()
p2.join()
from multiprocessing import Process, Pipe
def sender(conn):
conn.send("Hello from sender")
conn.close()
def receiver(conn):
msg = conn.recv()
print(f"收到消息: {msg}")
conn.close()
if __name__ == "__main__":
parent_conn, child_conn = Pipe()
p1 = Process(target=sender, args=(child_conn,))
p2 = Process(target=receiver, args=(parent_conn,))
p1.start()
p2.start()
p1.join()
p2.join()
使用async/await实现协程,适合高并发I/O操作。
import asyncio
async def say_hello(name):
print(f"Hello, {name}")
await asyncio.sleep(1)
print(f"Goodbye, {name}")
# 运行异步函数
asyncio.run(say_hello("张三"))
import asyncio
async def task(name, delay):
print(f"任务 {name} 开始")
await asyncio.sleep(delay)
print(f"任务 {name} 完成")
return f"结果-{name}"
async def main():
# 并发执行
results = await asyncio.gather(
task("A", 2),
task("B", 1),
task("C", 3)
)
print(f"所有结果: {results}")
asyncio.run(main())
import asyncio
import aiohttp
async def fetch(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
async def main():
urls = [
"https://api.example.com/1",
"https://api.example.com/2",
"https://api.example.com/3"
]
tasks = [fetch(url) for url in urls]
results = await asyncio.gather(*tasks)
for result in results:
print(len(result))
asyncio.run(main())
import time
import threading
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def cpu_bound_task(n):
"""CPU密集型任务"""
return sum(i * i for i in range(n))
def io_bound_task(n):
"""I/O密集型任务"""
time.sleep(0.1)
return n
# 串行执行
start = time.time()
results = [cpu_bound_task(1000000) for _ in range(4)]
print(f"串行: {time.time() - start:.2f}秒")
# 多线程(不适合CPU密集型)
start = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(cpu_bound_task, [1000000] * 4))
print(f"多线程: {time.time() - start:.2f}秒")
# 多进程(适合CPU密集型)
if __name__ == "__main__":
start = time.time()
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(cpu_bound_task, [1000000] * 4))
print(f"多进程: {time.time() - start:.2f}秒")
import queue
import threading
# 线程安全的队列
q = queue.Queue()
def producer():
for i in range(5):
q.put(i)
print(f"生产: {i}")
def consumer():
while True:
try:
item = q.get(timeout=1)
print(f"消费: {item}")
q.task_done()
except queue.Empty:
break
threads = []
threads.append(threading.Thread(target=producer))
threads.append(threading.Thread(target=consumer))
for t in threads:
t.start()
for t in threads:
t.join()
# 练习1:多线程打印
import threading
def print_numbers(thread_id):
for i in range(1, 11):
print(f"线程{thread_id}: {i}")
threads = []
for i in range(5):
t = threading.Thread(target=print_numbers, args=(i,))
t.start()
threads.append(t)
for t in threads:
t.join()
# 练习2:多进程计算
from multiprocessing import Pool
def square_sum(numbers):
return sum(x * x for x in numbers)
if __name__ == "__main__":
chunk_size = 250000
chunks = [range(i, i + chunk_size) for i in range(0, 1000000, chunk_size)]
with Pool(processes=4) as pool:
results = pool.map(square_sum, chunks)
total = sum(results)
print(f"平方和: {total}")
# 练习3:生产者-消费者
from multiprocessing import Process, Queue
import time
def producer(q):
for i in range(10):
q.put(f"item-{i}")
print(f"生产: item-{i}")
time.sleep(0.1)
q.put(None)
def consumer(q):
while True:
item = q.get()
if item is None:
break
print(f"消费: {item}")
time.sleep(0.2)
if __name__ == "__main__":
queue = Queue()
p1 = Process(target=producer, args=(queue,))
p2 = Process(target=consumer, args=(queue,))
p1.start()
p2.start()
p1.join()
p2.join()