本文共 19229 字,大约阅读时间需要 64 分钟。
本章内容:
1、进程:本质上就是一段程序的运行过程(抽象概念)
2、线程:最小的执行单元
3、进程:最小的资源单位
4、进程在执行过程中拥有独立的内存单元,而多个线程共享内存。
5、进程是系统进行资源分配和调度的一个独立单位,线程是进程的一个实体,是CPU调度和分派的基本单位,线程自己基本上不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器,一组寄存器和和栈)但是它可与同属一个进程的其他线程共享所拥有的全部资源。
Threading用于提供线程相关的操作,线程是应用程序中工作的最小单元。
#!/usr/bin/env python# -*- coding:utf-8 -*-import threadingimport time def show(arg): time.sleep(1) print ('thread'+str(arg)) for i in range(10): t = threading.Thread(target=show, args=(i,)) t.start() print ('main thread stop')
上述代码创建了10个“前台”线程,然后控制器就交给了CPU,CPU根据指定算法进行调度,分片执行指令。
thread方法:
import threadingimport time class MyThread(threading.Thread): def __init__(self,num): threading.Thread.__init__(self) self.num = num def run(self):#定义每个线程要运行的函数 print("running on number:%s" %self.num) time.sleep(3) if __name__ == '__main__': t1 = MyThread(1) t2 = MyThread(2) t1.start() t2.start()
setDaemon(True) :
将线程设置为守护线程,必须在start()方法高用之前设置,如果不设置为守护线程程序会被无限挂起。这个方法基本和join是相反的。 当我们在程序运行中,执行一个主线程,如果主线程以创建一个子线程,主线程和子线程就兵分两路,分别运行,那么当主线程完成 想退出时,会检验子线程是否完成。如果线程未完成,则主线程会等待了线程完成后再退出。但是有时候我们需要的是只要主线程完 成,不管子线程是否完成,都要和主线程一起退出,这时就可以用setDaemon方法了
threading.activeCount():返回正在运行的线程数量,与len(threading.enumerate())有相同的结果
import threadingfrom time import ctime,sleepimport timedef ListenMusic(name): print ("Begin listening to %s. %s" %(name,ctime())) sleep(3) print("end listening %s"%ctime())def RecordBlog(title): print ("Begin recording the %s! %s" %(title,ctime())) sleep(5) print('end recording %s'%ctime())threads = []t1 = threading.Thread(target=ListenMusic,args=('水手',))t2 = threading.Thread(target=RecordBlog,args=('python线程',))threads.append(t1)threads.append(t2)if __name__ == '__main__': #t1.setDaemon(True) t2.setDaemon(True) for t in threads: #t.setDaemon(True) #注意:一定在start之前设置 t.start() print(t.getName()) print("count:",threading.active_count()) while threading.active_count()==1: print ("all over %s" %ctime())
线程锁(threading.RLock & threading.Lock)
我们使用线程对数据进行操作的时候,如果多个线程同时修改某个数据,可能会出现不可预料的结果,为了保证数据的准确性,引入了锁的概念。
import threadingimport time num = 0 lock = threading.RLock() # 实例化锁类 def work(): lock.acquire() # 加锁 global num num += 1 time.sleep(1) print(num) lock.release() # 解锁 for i in range(10): t = threading.Thread(target=work) t.start()
threading.RLock和threading.Lock 的区别
RLock允许在同一线程中被多次acquire。而Lock却不允许这种情况。 如果使用RLock,那么acquire和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正释放所占用的锁。
import threading lock = threading.Lock()lock.acquire()lock.acquire() # 产生死锁lock.release()lock.release()
import threading rlock = threading.RLock()rlock.acquire()rlock.acquire() # 在同一线程内,程序不会堵塞。rlock.release()rlock.release()print("end.")
信号量(Semaphore)
互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去。
import threading,time def run(n): semaphore.acquire() time.sleep(1) print("run the thread: %s" %n) semaphore.release() if __name__ == '__main__': num= 0 semaphore = threading.BoundedSemaphore(5) #最多允许5个线程同时运行 for i in range(20): t = threading.Thread(target=run,args=(i,)) t.start()
事件(event)
python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。
事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。
import threading def do(event): print('start') event.wait() print('execute') event_obj = threading.Event()for i in range(10): t = threading.Thread(target=do, args=(event_obj,)) t.start() event_obj.clear()inp = input('input:')if inp == 'true': event_obj.set()
当线程执行的时候,如果flag为False,则线程会阻塞,当flag为True的时候,线程不会阻塞。它提供了本地和远程的并发性。
Condition
Python提供的Condition对象提供了对复杂线程同步问题的支持。Condition被称为条件变量,除了提供与Lock类似的acquire和release方法外,还提供了wait和notify方法。线程首先acquire一个条件变量,然后判断一些条件。如果条件不满足则wait;如果条件满足,进行一些处理改变条件后,通过notify方法通知其他线程,其他处于wait状态的线程接到通知后会重新判断条件。不断的重复这一过程,从而解决复杂的同步问题。
在典型的设计风格里,利用condition变量用锁去通许访问一些共享状态,线程在获取到它想得到的状态前,会反复调用wait()。修改状态的线程在他们状态改变时调用 notify() or notify_all(),用这种方式,线程会尽可能的获取到想要的一个等待者状态。
import threadingimport timedef consumer(cond): with cond: print("consumer before wait") cond.wait() print("consumer after wait")def producer(cond): with cond: print("producer before notifyAll") cond.notifyAll() print("producer after notifyAll")condition = threading.Condition()c1 = threading.Thread(name="c1", target=consumer, args=(condition,))c2 = threading.Thread(name="c2", target=consumer, args=(condition,))p = threading.Thread(name="p", target=producer, args=(condition,))c1.start()time.sleep(2)c2.start()time.sleep(2)p.start()# consumer()线程要等待producer()设置了Condition之后才能继续。
Condition使得线程等待,只有满足某条件时,才释放n个线程
import threadingdef run(n): con.acquire() con.wait() print("run the thread:%s " % n) con.release()if __name__ == "__main__": con = threading.Condition() for i in range(10): t = threading.Thread(target=run, args=(i,)) t.start() while True: inp = input(">>>") if inp == 'q': break con.acquire() con.notify(int(inp)) con.release()
import threadingdef condition_func(): ret = False inp = input('>>>') if inp == '1': ret = True return retdef run(n): con.acquire() con.wait_for(condition_func) print("run the thread: %s" %n) con.release()if __name__ == '__main__': con = threading.Condition() for i in range(10): t = threading.Thread(target=run, args=(i,)) t.start()
queue 队列
适用于多线程编程的先进先出数据结构,可以用来安全的传递多线程信息。
queue 方法:
生产者消费者模型
import queueimport threadingque = queue.Queue(10)def put(i): que.put(i) # print("size:", que.qsize())def get(i): get = que.get(i) print("get:", get)for i in range(1, 13): t = threading.Thread(target=put, args=(i,)) t.start()for i in range(1, 11): t = threading.Thread(target=get, args=(i,)) t.start()print("size:", que.qsize())
import queueimport threadingimport timeimport randommessage = queue.Queue(10)def product(num): for i in range(num): message.put(i) print('将{}添加到队列中'.format(i)) time.sleep(random.randrange(0, 1))def consume(num): count = 0 while count
自定义线程池:
# 自定义线程池(一)import queueimport threadingimport timeclass TreadPool: def __init__(self, max_num=20): self.queue = queue.Queue(max_num) for i in range(max_num): self.queue.put(threading.Thread) def get_thread(self): return self.queue.get() def add_thread(self): self.queue.put(threading.Thread)def func(pool, n): time.sleep(1) print(n) pool.add_thread()p = TreadPool(10)for i in range(1, 100): thread = p.get_thread() t = thread(target=func, args=(p, i,)) t.start()
# 线程池(二)import queueimport threadingimport contextlibimport timeStopEvent = object()class Threadpool: def __init__(self, max_num=10): self.q = queue.Queue() self.max_num = max_num self.terminal = False self.generate_list = [] # 以创建线程列表 self.free_list = [] # 以创建的线程空闲列表 def run(self, func, args, callback=None): """ 线程池执行一个任务 :param func: 任务函数 :param args: 任务函数所需参数 :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数) :return: 如果线程池已经终止,则返回True否则None """ if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: self.generate_thread() w = (func, args, callback,) self.q.put(w) def generate_thread(self): """ 创建一个线程 """ t = threading.Thread(target=self.call) t.start() def call(self): """ 循环去获取任务函数并执行任务函数 """ current_thread = threading.currentThread # 当前线程 self.generate_list.append(current_thread) event = self.q.get() while event != StopEvent: func, arguments, callback = event try: result = func(*arguments) status = True except Exception as e: status = False result = e if callback is not None: try: callback(status, result) except Exception as e: pass if self.terminal: event = StopEvent else: with self.worker_state(self.free_list, current_thread): event = self.q.get() # self.free_list.append(current_thread) # event = self.q.get() # self.free_list.remove(current_thread) else: self.generate_list.remove(current_thread) def close(self): """ 执行完所有的任务后,所有线程停止 """ num = len(self.generate_list) while num: self.q.put(StopEvent) num -= 1 def terminate(self): """ 无论是否还有任务,终止线程 """ self.terminal = True while self.generate_list: self.q.put(StopEvent) self.q.empty() # 清空队列 @contextlib.contextmanager # with上下文管理 def worker_state(self, frelist, val): """ 用于记录线程中正在等待的线程数 """ frelist.append(val) try: yield finally: frelist.remove(val)def work(i): time.sleep(1) print(i)pool = Threadpool()for item in range(50): pool.run(func=work, args=(item,))pool.close()# pool.terminate()
在写代码之前,我们先来看一下该怎么设计这样一个线程池,上面的线程池,我们的队列中,存的是线程类,我们每处理一个任务都实例化一个线程,然后执行完了之后,该线程就被丢弃了,这样有点不合适。我们这次设计的时候,
下面来一下代码是怎么实现的
import threadingimport queueimport timeimport contextlibclass ThreadingPool: def __init__(self, num): self.max = num self.terminal = False self.q = queue.Queue() self.generate_list = [] # 保存已经生成的线程 self.free_list = [] # 保存那些已经完成任务的线程 def run(self, func, args=None, callbk=None): self.q.put((func, args, callbk)) # 将任务信息作为一个元祖放到队列中去 if len(self.free_list) == 0 and len(self.generate_list) < self.max: self.threadstart() def threadstart(self): t = threading.Thread(target=self.handel) t.start() def handel(self): current_thread = threading.current_thread() self.generate_list.append(current_thread) event = self.q.get() while event != 'stop': func, args, callbk = event flag = True try: ret = func(*args) except Exception as e: flag = False ret = e if callbk is not None: try: callbk(ret) except Exception as e: pass if not self.terminal: with self.auto_append_remove(current_thread): event = self.q.get() else: event = 'stop' else: self.generate_list.remove(current_thread) def terminate(self): self.terminal = True while self.generate_list: self.q.put('stop') self.q.empty() def close(self): num = len(self.generate_list) while num: self.q.put('stop') num -= 1 @contextlib.contextmanager def auto_append_remove(self, thread): self.free_list.append(thread) try: yield finally: self.free_list.remove(thread)def f(i): # time.sleep(1) return idef f1(i): print(i)p = ThreadingPool(5)for i in range(20): p.run(func=f, args=(i,), callbk=f1)p.close()
线程的上一级就是进程,进程可包含很多线程,进程和线程的区别是进程间的数据不共享,多进程也可以用来处理多任务,不过多进程很消耗资源,计算型的任务最好交给多进程来处理,IO密集型最好交给多线程来处理,此外进程的数量应该和cpu的核心说保持一致。
在windows中不能用fork来创建多进程,因此只能导入multiprocessing,来模拟多进程,下面首先来看一下怎么创建进程,大家可以先猜一下下面的结果是什么
# 进程import multiprocessingl = []def f(i): l.append(i) print('hi', l)if __name__ == '__main__': for i in range(10): p = multiprocessing.Process(target=f, args=(i,)) # 数据不共享,创建10份 l列表 p.start()
注意:由于进程之间的数据需要各自持有一份,所以创建进程需要的非常大的开销。
数据共享
不同进程间内存是不共享的,要想实现两个进程间的数据交换,可以用以下方法:
Shared memory
数据可以用Value或Array存储在一个共享内存地图里,如下:
from multiprocessing import Process, Value, Arraydef f(a, b): a.value = 3.111 for i in range(len(b)): b[i] += 100if __name__ == '__main__': num = Value('f', 3.333) # 类似C语言中的 浮点型数 l = Array('i', range(10)) # 类似C语言中的整形数组,长度为10 print(num.value) print(l[:]) p = Process(target=f, args=(num, l)) p.start() p.join() print(num.value)# 大家自己运行一下,看下两次打印结果是否一样 print(l[:])'''结果:3.3329999446868896[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]3.1110000610351562[100, 101, 102, 103, 104, 105, 106, 107, 108, 109]'''
创建num和l 时,“d”和“i”参数由Array模块使用的typecodes创建:“d”表示一个双精度的浮点数,“i”表示一个有符号的整数,这些共享对象将被线程安全的处理。
'c': ctypes.c_char, 'u': ctypes.c_wchar, 'b': ctypes.c_byte, 'B': ctypes.c_ubyte, 'h': ctypes.c_short, 'H': ctypes.c_ushort, 'i': ctypes.c_int, 'I': ctypes.c_uint, 'l': ctypes.c_long, 'L': ctypes.c_ulong, 'f': ctypes.c_float, 'd': ctypes.c_double
Server process
由Manager()返回的manager提供list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array类型的支持。
from multiprocessing import Process, Manager def f(d, l): d[1] = '1' d['2'] = 2 d[0.25] = None l.reverse() if __name__ == '__main__': with Manager() as manager: d = manager.dict() l = manager.list(range(10)) p = Process(target=f, args=(d, l)) p.start() p.join() print(d) print(l) # 输出结果:{0.25: None, 1: '1', '2': 2}[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
Server process manager比 shared memory 更灵活,因为它可以支持任意的对象类型。另外,一个单独的manager可以通过进程在网络上不同的计算机之间共享,不过他比shared memory要慢。
# manage.dict()共享数据from multiprocessing import Process,Manager manage = Manager()dic = manage.dict() def Foo(i): dic[i] = 100+i print (dic.values()) for i in range(2): p = Process(target=Foo,args=(i,)) p.start() p.join()
当创建进程时(非使用时),共享数据会被拿到子进程中,当进程中执行完毕后,再赋值给原值。
进程锁实例
#!/usr/bin/env python# -*- coding:utf-8 -*-from multiprocessing import Process, Array, RLockdef Foo(lock,temp,i): """ 将第0个数加100 """ lock.acquire() temp[0] = 100+i for item in temp: print ('%s----->%s'%(i,item)) lock.release()lock = RLock()temp = Array('i', [11, 22, 33, 44])for i in range(20): p = Process(target=Foo,args=(lock,temp,i,)) p.start()
进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。
方法:
apply(func[, args[, kwds]]) :使用arg和kwds参数调用func函数,结果返回前会一直阻塞,由于这个原因,apply_async()更适合并发执行,另外,func函数仅被pool中的一个进程运行。
apply_async(func[, args[, kwds[, callback[, error_callback]]]]) : apply()方法的一个变体,会返回一个结果对象。如果callback被指定,那么callback可以接收一个参数然后被调用,当结果准备好回调时会调用callback,调用失败时,则用error_callback替换callback。 Callbacks应被立即完成,否则处理结果的线程会被阻塞。
close() : 阻止更多的任务提交到pool,待任务完成后,工作进程会退出。
terminate() : 不管任务是否完成,立即停止工作进程。在对pool对象进程垃圾回收的时候,会立即调用terminate()。
join() : wait工作线程的退出,在调用join()前,必须调用close() or terminate()。这样是因为被终止的进程需要被父进程调用wait(join等价与wait),否则进程会成为僵尸进程
进程池中有两个方法:
from multiprocessing import Poolimport timedef myFun(i): time.sleep(2) print("mytfun",i) return i+100def end_call(arg): print("end_call",arg)if __name__ == "__main__": p = Pool(5) # print(p.map(myFun,range(10))) for i in range(10): p.apply_async(func=myFun,args=(i,),callback=end_call) print("end") p.close() p.join()
官方例程:
from multiprocessing import Pool, TimeoutErrorimport timeimport os def f(x): return x*x if __name__ == '__main__': # 创建4个进程 with Pool(processes=4) as pool: # 打印 "[0, 1, 4,..., 81]" print(pool.map(f, range(10))) # 使用任意顺序输出相同的数字, for i in pool.imap_unordered(f, range(10)): print(i) # 异步执行"f(20)" res = pool.apply_async(f, (20,)) # 只运行一个进程 print(res.get(timeout=1)) # 输出 "400" # 异步执行 "os.getpid()" res = pool.apply_async(os.getpid, ()) # 只运行一个进程 print(res.get(timeout=1)) # 输出进程的 PID # 运行多个异步执行可能会使用多个进程 multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)] print([res.get(timeout=1) for res in multiple_results]) # 是一个进程睡10秒 res = pool.apply_async(time.sleep, (10,)) try: print(res.get(timeout=1)) except TimeoutError: print("发现一个 multiprocessing.TimeoutError异常") print("目前,池中还有其他的工作") # 退出with块中已经停止的池 print("Now the pool is closed and no longer available")
线程和进程的操作是由程序触发系统接口,最后的执行者是系统;协程的操作则是程序员。
协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。
协程的适用场景:当程序中存在大量不需要CPU的操作时(IO),适用于协程;
# 安装pip install gevent # 导入模块import gevent
greenlet
# greenletfrom greenlet import greenlet def test1(): print(11) gr2.switch() print(22) gr2.switch() def test2(): print(33) gr1.switch() print(44) gr1 = greenlet(test1)gr2 = greenlet(test2)gr1.switch()''' # 输出结果:11332244'''
gevent
# geventimport gevent def foo(): print("Running in foo") gevent.sleep(0) print("Explicit context switch to foo angin") def bar(): print("Explicit context to bar") gevent.sleep(0) print("Implicit context swich back to bar") gevent.joinall([ gevent.spawn(foo), gevent.spawn(bar),]) '''# 输出结果:Running in fooExplicit context to barExplicit context switch to foo anginImplicit context swich back to bar'''
遇到IO自动切换
from gevent import monkeymonkey.patch_all()import geventimport requestsdef f(url): print("FET: %s" % url) resp = requests.get(url) data = len(resp.text) print(url, data)gevent.joinall([ gevent.spawn(f, 'https://www.python.org/'), gevent.spawn(f, 'https://www.yahoo.com/'), gevent.spawn(f, 'https://github.com/'),])
上面的例子,利用协程,一个线程完成所有的请求,发出请求的时候,不会等待回复,而是一次性将所有的请求都发出求,收到一个回复就处理一个回复,这样一个线程就解决了所有的事情,效率极高。
转载地址:http://qhzwi.baihongyu.com/