【python】进阶之并发编程(七)

7 并发编程

并发编程的目的:提高程序处理任务,处理请求的速度/能力。

并发编程的方式:

  • 多进程
  • 多线程
  • 协程

7-1 开启多进程的两种方式

  • 程序仅仅只是一堆代码而已,而进程指的是程序的运行过程。

  • 需要强调的是:同一个程序执行两次,那就是两个进程

方式1:Process

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import time
from multiprocessing import Process


def func(name):
print(f'{name} is start')
time.sleep(1)
print(f'{name} is done')



if __name__ == '__main__':
# 创建一个子进程, windows下只能使用这种方式,否则报错
p = Process(target=func, args=('jack',))
# 开启子进程
p.start()
print('主进程结束')

方式2:继承Process, 重写方法run

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import time
from multiprocessing import Process


def func(name):
print(f'{name} is start')
time.sleep(2)
print(f'{name} is done')


class MyProcess(Process):
def __init__(self, name):
super().__init__()
self.name = name

def run(self): # start会自动调用run
func(self.name)


if __name__ == '__main__':
p = MyProcess("jack")
p.start()
print('主进程结束')

7-2 主进程等待子进程结束

示例1:使用join()优雅的等待子进程结束,不要使用time.sleep

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import time
from multiprocessing import Process


def func(name):
print(f'{name} is start')
time.sleep(1)
print(f'{name} is done')


if __name__ == '__main__':
p = Process(target=func, args=['jack'])
p.start()
p.join(timeout=1) # time.sleep(10)
print('主进程结束')


# p.join(timeout=1) timeout设置等待时间

示例2:等待多个子进程,使用不当造成串行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import time
from multiprocessing import Process


def func(name):
print(f'{name} is start')
time.sleep(1)
print(f'{name} is done')


if __name__ == '__main__':
for i in range(5):
p = Process(target=func, args=[str(i)])
p.start()
p.join()
print('主进程结束')

示例3:示例2优化版

1
2
3
4
5
6
7
8
9
10
if __name__ == '__main__':
ps = []
for i in range(5):
p = Process(target=func, args=[str(i)])
p.start()
ps.append(p)

for p in ps:
p.join()
print('主进程结束')

示例4:等待总耗时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import time
from multiprocessing import Process


def func(i):
print(f'{i} is start')
time.sleep(i)
print(f'{i} is done')


if __name__ == '__main__':
ps = []
start = time.time()
for i in range(1, 4):
p = Process(target=func, args=[i])
p.start()
ps.append(p)

for p in ps:
p.join()
print('主进程结束')
print(time.time() - start) # ~3s

7-3 守护进程

主进程将一个子进程设置为守护进程,守护进程就会在主进程结束时随之结束不再运行

  • 守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children
  • 在开启子进程之前将其设置为守护进程
  • 主进程代码运行结束,守护进程随即终止,不受别的子进程的影响。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from multiprocessing import Process
import time

def task():
print('i am coming')
time.sleep(2)
print('i am backing')


if __name__ == '__main__':
p = Process(target=task)
p.daemon = True # p.start()之前将子进程设置为守护进程,陪着主进程一块结束。
p.start()
time.sleep(0.1)
print('i am master')

示例2:各子进程相互独立,不会因为一个子进程设置为守护进程而影响另一个

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from multiprocessing import Process
import time


def task(i):
print(f'{i} am coming')
time.sleep(i)
print(f'{i} am backing')


if __name__ == '__main__':
for i in range(1, 4):
p = Process(target=task, args=[i])
if i == 3: # 编号为3的子进程设置为守护进程,当主进程执行结束后就被回收了
p.daemon = True
p.start()

time.sleep(2)
print('master')

7-4 进程隔离内存空间

进程之间内存空间是隔离的,即进程j间数据是隔离的,互不影响

因为这是两个独立的名称空间。子进程有其自己的名称空间,只是修改其名称空间内全局变量的值,而不影响父进程自己名称空间全局变量的值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from multiprocessing import Process

money = 100


def task():
global money
money = 666
print('子', money)


if __name__ == '__main__':
p = Process(target=task)
p.start()
p.join()
print('主', money)

7-5 互斥锁

进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是有问题的。

示例1:进程不共享数据导致的数据混乱问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from multiprocessing import Process, Lock
import json, time, random


# 查票
def search(i):
with open('data', 'r', encoding='utf8') as f:
dic = json.load(f)
print('用户%s查询余票:%s' % (i, dic.get('ticket_num')))
return dic


def buy(i):
dic = search(i) # 先查票
# 再买票
time.sleep(random.randint(1, 3))
if dic.get('ticket_num') > 0:
dic['ticket_num'] -= 1
with open('data', 'w', encoding='utf8') as f:
json.dump(dic, f)
print('用户%s买票成功' % i)
else:
print('用户%s买票失败' % i)



if __name__ == '__main__':
for i in range(1, 11):
p = Process(target=buy, args=[i])
p.start()

共享带来的是竞争,竞争带来的结果就是错乱。控制的方式就是加锁处理,即每个进程依次使用资源。

示例2:互斥锁,保证数据安全

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from multiprocessing import Process, Lock
import json, time, random


# 查票
def search(i):
with open('data', 'r', encoding='utf8') as f:
dic = json.load(f)
print('用户%s查询余票:%s' % (i, dic.get('ticket_num')))


# 买票 1.先查 2.再买
def buy(i, mutex):
search(i)
mutex.acquire() # 抢锁, 给买票环节加锁处理
time.sleep(random.randint(1, 3))
with open('data', 'r', encoding='utf8') as f:
dic = json.load(f)
ticket_num = dic["ticket_num"]
if ticket_num > 0:
dic['ticket_num'] -= 1
with open('data', 'w', encoding='utf8') as f:
json.dump(dic, f)
print('用户%s买票成功' % i)
else:
print('用户%s买票失败' % i)
mutex.release() # 释放锁


if __name__ == '__main__':
# 在主进程中生成一把锁 让所有的子进程抢 谁先抢到谁先买票
mutex = Lock()
for i in range(1, 11):
p = Process(target=buy, args=[i, mutex])
p.start()

加锁处理的结果就是:将并发变成串行,牺牲效率,保证数据的安全

7-6 多线程

基础知识:

  • 一个程序的运行过程是一个进程,每个进程自带一个控制线程
  • 进程是一个资源单位,线程是具体的执行单位
  • 进程相当于一个车间,线程相当于车间内的流水线,车间内可以有多条流水线,即一个进程内可以有多个线程。
  • 多线程之间共享数据资源,相当于一个车间内有多条流水线,都共用一个车间的资源。
1
2
3
4
5
6
进程: 资源单位(起一个进程是在内存空间中开辟一块独立的空间)
线程: 执行单位(真正被干活的是进程里面的线程,线程在执行中所需要使用到的资源都找所在的进程索要)

# 开进需要开内存空间,程资源消耗大
# 开线程无需再次申请内存空间操作,消耗小
# 开线程的开销要远远的小于进程的开销。同一个进程下的多个线程数据是共享的

为什么使用多线程:如果多个任务共用一块地址空间,那么必须在一个进程内开启多个线程。

示例:开线程的两种方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from threading import Thread


def task():
print('我是子线程')


# 方式1:直接使用Thread实例化线程对象
if __name__ == '__main__':
t = Thread(target=task)
t.start()
print('我是主线程')


# 方式2:继承Therad,自定自己的线程类,重写run方法
class MyThread(Thread):
def run(self):
task()


if __name__ == '__main__':
t = MyThread()
t.start()
print('我是主线程')

7-7 线程共享内存空间

补充:线程的知识点和进程的知识点类似,可以举一反三的学习。

同一个进程下的多个线程是共享当前进程下的数据资源的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from threading import Thread

n = 100


def func():
global n
n = 200
print('子线程:', n) # 110


if __name__ == '__main__':
t = Thread(target=func)
t.start()
# t.join()
print('主线程:', n)

7-8 GIL

全局解释器锁(GIL, Global Interpreter Lock)。

结论:在Cpython解释器中,同一个进程下开启的多线程,同一时刻只能有一个线程执行,无法利用多核优势

1
2
3
# 程序的代码需要交给解释器执行,因此该进程内的每个线程都需要将共享的代码交给解释器执行,这就有了竞争。
# 垃圾回收机制GC也是当前进程内的一个干活的线程,GC会和当前进程内其他线程有抢代码数据的竞争,因此有了GIL。
# GIL保证进程内同一时刻只有一个线程在运行,这样做是为了保证内存管理(垃圾回收机制GC)的运行。

总结:

  • GIL不是python语言的特性,它是Cpython解释器引用的一个概念。
  • GIL本质是一把互斥锁,是将线程从并发变为串行,牺牲效率保证数据安全。
  • 保护不同的数据要加不同的互斥锁,GIL保护的是解释器级别的数据安全。

image-20220514173429195

首先明确:python多线程无法利用计算机的多核优势

但是python多线程可以实现并发的,且不是所有的场合都要使用多核的。

场合:

  • 对于计算密集型,多进程优于多线程(多进程的并行计算优势突出了)
  • 对于IO密集型,多线程优于多进程(开进程的开销大缺陷放大了)

结论:python多线程无法使用多核优势,不等于多线程一无是处

7-9 进程池和线程池

基础知识:

  • 无论是开进程还是开线程,都需要消耗资源,只不过开线程的比开进程的消耗更少。
  • 不可能无限制的开进程或线程,因为计算机硬件的资源是有限的。
  • 为了保证服务器一定的处理能力,只能采取的概念。
  • 池:就是限制开进程或开线程的数量,比如开多线程,限制最多开10个线程,那这个线程池的大小就是10,也就是说最多只有10个线程在干活。

示例1:线程池和进程池的基本使用(都是使用concurrent.futures模块)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from concurrent.futures import ThreadPoolExecutor
import time
import random


def task(n): # 处理任务的函数
time.sleep(random.randint(1, 5))
print(n ** 2)



if __name__ == '__main__':
pool = ThreadPoolExecutor(5) # 开一个大小是5的线程池
for i in range(1, 10):
pool.submit(task, i) # 提交任务和任务需要的参数

示例2:增加回调函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from concurrent.futures import ThreadPoolExecutor
import time
import random


def task(n): # 处理任务的函数
time.sleep(random.randint(1, 5))
return n ** 2


def call_back(future): # 需要定义一个形参,这个形参是future,通过future.result()获取任务执行返回值。
print('call_back>>>:', future.result())


if __name__ == '__main__':
pool = ThreadPoolExecutor(5)
for i in range(1, 10):
pool.submit(task, i).add_done_callback(call_back)
# 给任务绑定回调函数,任务结束后自动调用回调函数,并将future对象当实参传给回调函数。

示例3:map函数快速获取结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from concurrent.futures import ThreadPoolExecutor
import time
import random


def task(n): # 处理任务的函数
time.sleep(random.randint(1, 5))
return n ** 2


if __name__ == '__main__':
pool = ThreadPoolExecutor(5)
rets = pool.map(task, range(1, 10))
for r in rets:
print(r)
# map第一个参数是任务,第二个参数是迭代器,迭代器的每个元素依次传给任务当实参
# map返回一个生成器,可以直接获取每个任务的结果

总结

  • 池子一旦造出来后,固定了线程或进程 ,不会再变更,所有的任务都是这些进程或线程处理。
  • 这些进程或线程不会再出现重复创建和销毁的过程。
  • 任务的提交是异步的,异步提交任务的返回结果 应该通过回调机制来获取。
  • 回调机制就相当于,把任务交给一个员工完成,它完成后主动找你汇报完成结果。

7-10 协程【伪】

协程是单线程下的并发,协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。

协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级;单线程内就可以实现并发的效果,最大限度地利用cpu

示例1:基于yield实现代码块切换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def f1():
yield 1
yield from f2()
yield 2


def f2():
yield 3
yield 4


generator = f1() # f1()是生成器

for i in generator:
print(i) # 依次打印:1 3 4 2

示例2:使用asyncio实现协程【官方推荐】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import asyncio


async def f1():
print(1)
await asyncio.sleep(2) # 等 可以等待的对象
print(2)


async def f2():
print(3)
await asyncio.sleep(2)
print(4)


tasks = [
asyncio.ensure_future(f1()),
asyncio.ensure_future(f2())
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

严格来说,其实Python并没有协程,是一种伪协程,因为python的协程只是一种异步编码思想,而没有一个完整生命周期


【python】进阶之并发编程(七)
http://example.com/2024/03/20/607python进阶之并发编程(七)/
作者
Wangxiaowang
发布于
2024年3月20日
许可协议