1 Star 0 Fork 0

读梦人/python_study

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
046-python进程.py 9.38 KB
一键复制 编辑 原始数据 按行查看 历史
读梦人 提交于 2023-08-27 21:36 . 作业
"""
作业:
1、线程与进程的关系与区别
答:
关系:线程是进程中的一部分,一个进程可以包含多个线程,也可以通过线程池管理线程。
区别:
1)概念区别:线程是被进程分配调度的,也就是基于一个CPU单位运作;线程之间可以通过共享变量的方式实现多线程之间通信;
进程属于系统级别,可以通过多子进程、进程池等方法实现多和CPU的调度使用,且拥有独立的内存空间,进程之间数据通信需要依赖Queue等方法。
2)用法区别:线程较轻量级,多线程并发针对IO密集型任务效率更高,且在创建、切换线程时资源开销较小;
进程CPU资源利用更高,并且可以保障多进程之间的数据隔离,保护系统;多进程可以更好的利用多核CPU资源。
2、为什么进程使用要放在__main__中
答:官方解释:__name__ 是当前模块名,当模块被直接运行时模块名为 __main__
理解:当模块被直接运行时,以下代码块将被运行,当模块是被导入时,代码块不被运行。防止代码模块重复调用导入。
附:进程在Linux/Unix中创建的方式为fork;而Windows采用spawn;
由于multiprocessing模块的设计,在windows上运行时,必须使用__main__方法来调用进程;官方建议Linux/Unix系统在编写代码时,也使用__main__方法,保证代码可以多平台的运行。
3、笔记
========================================================第 1 节multiprocessing、子类继承
1、multiprocessing
1)同时启用多个进程; join方法不可针对同一进程多次join,会造成死锁;
2)main函数
2.1)spawn 父进程启动新的python解释器进程;(Windows默认) ·main·方法作为父进程启动
2.2)fork Linux/Unix/MacOS
2.3)forkserver Linux/Unix/MacOS
import time
from multiprocessing import Process
def func(name):
#开始启动子进程
print(f'start test process: {name} \n')
time.sleep(10)
#子进程执行完毕
print(f'end test process: {name} \n')
if __name__ == '__main__':
pl = []
for i in range(5):
name = f'python_process_{i}'
p = Process(target=func, args=(name, ))
#进程启动
p.start()
#直接串行调用,join会阻塞
#print(p)
#p.join()
#进程添加到pl列表中
pl.append(p)
print(pl)
#利用for循环,将5个进程同时发起;多进程工作调用方法
for p in pl:
p.join()
print('join========end')
#主进程由于上份的join方法,被阻塞等待
print('main process finish!')
2、Process子类化:通过继承方式来实现进程
import time
import random
from multiprocessing import Process
class WorkerProcess(Process):
def __init__(self, name, func):
# 等价multiprocessing.Process.__init__(self)
super().__init__()
self.name = name
self.func = func
def run(self):
# 重写runfangfa
self.func(self.name)
def worker(name):
print(f'start work {name}\n')
worker_time = random.choice(range(5, 10))
time.sleep(worker_time)
print(f'{name} work finished in {worker_time} seconds\n')
if __name__ == '__main__':
pl = []
for i in range(5):
process = WorkerProcess(name=f'computer_{i}', func=WorkerProcess.worker)
process.start()
pl.append(process)
for process in pl:
process.join()
======================================================第 2 节
1、进程池
#如果需要大量子进程,可以创建进程池Pool
import os
import random
import time
from multiprocessing import Pool
def long_time_task(name):
print(f'Run task {name}, {os.getpid()}')
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print(f'Run task {name} runs {end - start} second')
if __name__ == '__main__':
print('Parent process %s.' % os.getpid())
#4个进程池,一般根据CPU个数
p = Pool(4)
#5个任务
for i in range(5):
# 使用异步优势进行
p.apply_async(long_time_task, args=(f'process_{i}', ))
p.close() #进程池关闭,不允许再添加内容
p.join() #阻塞
print('All subprocess done.')
2、进程锁:一般很少使用.(相当于多个进程,以串行方式进行):对多个进程进行启动顺序管理。
#multiprocessing中Lock 与threading中基本一样
import os
import time,random
from multiprocessing import Process,Lock
def f(lock, i):
#with管理锁
with lock:
print(f'{i}:{os.getpid()} is running')
time.sleep(random.randint(1,3))
print(f'{i}:{os.getpid()} is done')
# try:使用try方法,锁、释放锁
# lock.acquire()
# print(f'{i}:{os.getpid()} is running')
# time.sleep(random.random(1,3))
# print(f'{i}:{os.getpid()} is done')
# finally:
# lock.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
p = Process(target=f, args=(lock, num))
p.start()
#p.join()
#p.close()
#2.2)RLock :可重入锁与普通锁方法一样,可以对同一进程进行多次锁+1;
3、进程通信:多个进程之间进行数据互通
#Queue队列
#Pipes管道
#########进程通信结构:
import os
import random
import time
from multiprocessing import Process,Queue
#写数据进程执行代码
def write(q):
print('Process to write: %s' % os.getpid())
for value in ['A', 'B', 'C', 'D']:
print('Put %s to queue ...' % value)
q.put(value) #q为main中的Queue,想队列推送value
time.sleep(random.randint(1, 3))
#读数据进程执行代码
def read(q):
print('Process to read: %s' % os.getpid())
while True: #循环
value = q.get(True) #从Queue get
print('Get %s from queue.' % value)
if __name__ == '__main__':
#父进程创建Queue,传递给子进程
q = Queue()
qw = Process(target=write, args=(q, ))
qr = Process(target=read, args=(q, ))
#qw/qr start
qw.start()
qr.start()
#等待qw执行结束
qw.join()
#qr为死循环,需要kill掉
qr.kill()
###进程通信中的坑:(传值等)
1)不能传递 锁对象
2)修改类对象
可以支持的封存/解封 的对象:
None/True/False
int/float/plural(整型、浮点型、复数)
str/type/bytearray
封存对象集合:tuple/list/set/dict
定义在最外层的函数/内置函数/类
某些类实例,这些类的__dict__属性/__getstate__()函数返回值可以被封存
======================================================第 3 节
1、concurrent.futures中池的原理
用法一样,底层原理不一样;
1)性能池:concurrent.futures.ThreadPoolExecutor(max_workers)
2)进程池:concurrent.futures.ProcessPoolExecutor(max_workers)
作用:1)优化/简化 线程/进程的使用;
2)减少创建线程/进程开销;
3)重复使用线程/进程并不是池的必须得规则,但是程序猿使用池的主要原因。
2、进程池的使用
import time
import concurrent.futures
number_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
def evaluate_item(x):
#求sum(为了耗时)
resulte_item = count(x)
#打印输入输出
return resulte_item
def count(number):
for i in range(0, 10000000):
i = i + 1
return i * number
if __name__ == '__main__':
#进程池
with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
#executor.submit异步提交
futures = [executor.submit(evaluate_item, item) for item in number_list]
#获取结果
for futures in concurrent.futures.as_completed(futures):
print(futures.result())
3、线程池与进程池的差异
import time
import concurrent.futures
number_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
def evaluate_item(x):
#求sum(为了耗时)
resulte_item = count(x)
#打印输入输出
return resulte_item
def count(number):
for i in range(0, 10000000):
i = i + 1
return i * number
if __name__ == '__main__':
#顺序执行:单核cpu跑,CPU密集型运算效率高
start_time = time.time()
for item in number_list:
print(evaluate_item(item))
print('Sequential execution in ' + str(time.time() - start_time) + 'second')
#线程池:单核CPU跑,但是只针对IO密集型效率更高,所以性能不如顺序执行
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(evaluate_item, item) for item in number_list]
for future in concurrent.futures.as_completed(futures):
print(future.result())
print('Sequential execution in ' + str(time.time() - start_time) + 'second')
#进程池:多核CPU跑多条任务,并行更快
start_time = time.time()
with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(evaluate_item, item) for item in number_list]
for future in concurrent.futures.as_completed(futures):
print(future.result())
end_time = time.time()
print('Sequential execution in ' + str(time.time() - start_time) + 'second')
由于进程不受GIL的限制,可以大大提高并行效率,利用多核CPU资源
"""
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/w854090472/python_study.git
git@gitee.com:w854090472/python_study.git
w854090472
python_study
python_study
master

搜索帮助