python3--队列Queue

进程

multiprocess

Process —— 进程 在python中创建一个进程的模块

  start

  daemon 守护进程

  join 等待子进程执行结束

锁 Lock

acquire release

锁是一个同步控制的工具

如果同一时刻有多个进程同时执行一段代码,

那么在内存中的数据是不会发生冲突的

但是,如果涉及到文件,数据库就会发生资源冲突的问题

我们就需要用锁来把这段代码锁起来

任意一个进程执行了acquire之后,

其他所有的进程都会在这里阻塞,等待一个release

信号量 semaphore

锁 + 计数器

同一时间只能有指定个数的进程执行同一段代码

事件 Event

set clear is_set   控制对象的状态

wait  根据状态不同执行效果也不同

    状态是True ---> pass

    状态是False --> 阻塞

一般wait是和set clear放在不同的进程中

set/clear负责控制状态

wait负责感知状态

我可以在一个进程中控制另外一个或多个进程的运行情况

Queue(队列)的其它方法

from multiprocessing import Process,Queue
q = Queue()print(q.empty())  # 判断队列Queue是否为空print(q.full()) 
 # 判断队列Queue是否满了(不能存放了)

执行结果

True

False

创建队列数量

from multiprocessing import Process, Queue
q = Queue(10)  #设置队列容量为10for i in range(10):
    q.put(i)print(q.qsize())  # 返回队列中目前项目的正确数量(多进程不准)print(q.full()) 
      # 返回队列是否以满(True/False)(多进程不准)q.put(1111)  
      # 阻塞,后面代码不会执行--队列已满print(q.empty())print(q.full())

执行结果

26c5d473d61c13b70eaaccecef4e0aaf.png

注释:

队列可以在创建的时候指定一个容量

如果在程序运行的过程中,队列已经有了足够的数据,再put就会发生阻塞

如果队列为空,在get就会发生阻塞

为什么要设置队列的长度呢?内存有限,不设置可以导致系统崩溃

在进程中使用队列可以完成双向通信

import timeimport osfrom multiprocessing import Process, Queue
def wahaha(q):
    print(q.get(), os.getpid())  # os.getpid()进程id号
    q.put(2)if __name__ == '__main__':
    q = Queue()
    p = Process(target=wahaha, args=(q,))
    p.start()
    q.put(222)
    time.sleep(0.2)
    print(q.get(), os.getppid())  # os.getppid()父进程id号

执行结果

222 1088

2 6336

既打印了主进程put的值,也打印了子进程put的值,在进程中使用队列可以完成双向通信

生产者消费者模型

解决数据供需不平衡的情况

在同一时刻,只能有一个进程来取值,它内部有一个锁的机制。那么另一个进程进来后就会阻塞一会儿,

阻塞的时候非常短队列是进程安全的,内置了锁来保证队列中的每一个数据都不会被多个进程重复取值

import timeimport randomfrom multiprocessing import Process,Queue
def consumer(q,name): #消费者  
    while True:
        food = q.get()
        if food == 'done':break  #如果取到的值为done,则break
        time.sleep(random.random())
        print('{}吃了{}'.format(name, food))
        
def producer(q,name,food): #生产者  
    for i in range(10):
        time.sleep(random.random())
        print('{}生成了{}{}'.format(name, food, i))
        q.put('{}{}'.format(food, i))
        
 if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=producer, args=(q, 'Sam', '饼干'))
    p2 = Process(target=producer, args=(q, 'Tom', '辣条'))
    p1.start()
    p2.start()
    Process(target=consumer, args=(q, '张三')).start()
    Process(target=consumer, args=(q, '李四')).start()
    p1.join()  # 等待消费者消耗
    p2.join()  # 等待消费者消耗
    q.put('done')  #一个done给进程p1
    q.put('done')  #一个done给进程p2

执行结果

JoinableQueue([maxsize])

创建可连接的共享进程队列。这就像是一个Queue对象,但队列允许项目的使用者通知生产者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的 

方法介绍

JoinableQueue的实例p除了与Queue对象相同的方法之外,还具有以下方法:
q.task_done() 使用者使用此方法发出信号,表示q.get()返回的项目已经被处理。如果调用此方法的次数大于从队列中删除的项目数量,将
引发ValueError异常。
q.join() 生产者将使用此方法进行阻塞,直到队列中所有项目均被处理。阻塞将持续到为队列中的每个项目均调用q.task_done()方法
为止。 
下面的例子说明如何建立永远运行的进程,使用和处理队列上的项目。生产者将项目放入队列,并等待它们被处理。

JoinableQueue队列实现消费之生产者模型

import timeimport randomfrom multiprocessing import Process,JoinableQueue
def consumer(q,name):  # 消费者  
  while True:
        food = q.get()
        time.sleep(random.random())
        print('{}吃了{}'.format(name,food))
        q.task_done()
        
def producer(q,name,food): # 生产者    
    for i in range(10):
        time.sleep(random.random())
        print('{}生产了{}{}'.format(name,food,i))
        q.put('{}{}'.format(food,i))
    q.join()  # 等到所有的数据都被task_done才结束if __name__ == '__main__':
    q = JoinableQueue()
    p1 = Process(target=producer,args=(q,'111','热干面'))
    p2 = Process(target=producer,args=(q,'222','炸酱面'))
    p1.start()
    p2.start()
    c1 = Process(target=consumer, args=(q, 'Sam'))
    c2 = Process(target=consumer, args=(q, 'Tom'))
    c1.daemon = True  # 设置为守护进程
    c2.daemon = True  # 设置为守护进程
    c1.start()
    c2.start()
    p1.join()
    p2.join()

执行结果

注释:

producer  生产者

    put  往队列里面添加数据

    生产完,全部的数据就没有其他工作了

    在生产数据方:允许执行q.join,等待消费者取数据

    join会发起一个阻塞,直到所有当前队列中的数据都被消费者取走

consumer  消费者

    get  获取数据

    处理数据

    q.task_done() 告诉q,刚刚从q获取的数据已经处理完了

执行流程:

consumer消费者每完成一个任务就会给q发送一个taskdone

producer生产者在所有的数据都生产完之后会执行q.join()

producer生产者会等待consumer消费完数据才结束

主进程中对producer进程进行join

主进程中的代码会等待producer执行完才结束

producer生产者结束就意味着主进程代码的结束

consumer消费者作为守护进程结束

结束顺序:

consumer消费者中queue中的所有数据被消费

producer生产者 join结束

主进程的代码结束

consumer消费者结束

主进程结束(主进程等待子进程结束才结束)