Python队列Queue的增删改查和多线程处理,简版

文摘   科技   2023-09-12 18:25   北京  


ISEE小语


三毛曾说过一句话:
我们不肯探索自己本身的价值,我们过分看重他人在自己生命里的参与。于是,孤独不再美好,失去了他人,我们惶惑不安。




回顾上篇



Python爬虫某阁网站的在线小说,详细新版

ISEE,公众号:ISEE小栈Python爬虫某阁网站的在线小说,详细新版


开始本篇


在Python标准库中,queue模块用处比较多,它提供了实现多线程编程中常用的队列(Queue)数据结构的类。队列是一种先进先出(FIFO)的数据结构,用于在多个线程之间安全地传递数据。

queue模块中最常用的类是Queue类。它实现了一个线程安全的队列,可以在多线程环境中安全地进行入队和出队操作,避免了线程之间的竞争条件。



环境:

Pycharm

Python 3.9.16



导入:

import timeimport datetimeimport threadingfrom queue import Queue

(左右滑动查看完整代码)

这些以下操作会用到




Queue的基本使用


Python的队列在内存空间允许的前提下是可以不断增加元素的。

但需要注意,如果队列元素数量过大,Python的内存空间很容易就会占用完毕,导致OutOfMemoryError或其他类似错误。


创建队列


创建队列,分为队列大小无限制指定队列大小

队列大小无限制,默认maxsize=0

queue = Queue(maxsize=0)

(左右滑动查看完整代码)

指定队列的大小,有限制,其中n>0

queue = Queue(maxsize=n)

(左右滑动查看完整代码)



添加元素


添加元素,有两种,分为put和put_wait

第一种:put(item,block=True,timeout=None)

item-参数是要添加到队列中的元素。

block-参数指定当队列已满时的行为。如果block=True(默认值),则put()方法会阻塞直到有空间可用。如果block=False,则put()方法会立即引发Queue.Full异常。

timeout-参数指定在队列已满并且block=True时的最长等待时间(以秒为单位)。如果在指定的时间内队列仍然是满的,则引发Queue.Full异常。

queue.put("A", block=True, timeout=None)queue.put("B", block=True, timeout=None)queue.put("C", block=True, timeout=None)queue.put("D", block=True, timeout=None)

(左右滑动查看完整代码)


第二种:put_nowait(item)

item-参数是要添加到队列中的元素。

put_nowait()方法不会阻塞。如果队列已满,则立即引发Queue.Full异常。类似是put()方法中的block=False的效果

queue.put_nowait("E")print("添加元素完成后的队列:", queue.queue)

(左右滑动查看完整代码)

结果:



获取元素


通过索引,获取队列头部元素

获取元素是基于以上创建的队列和添加元素

front_element = queue.queue[0]print("队列头部的元素:", front_element)

(左右滑动查看完整代码)

果:




修改元素


通过索引,修改队列中的元素

这种情况通常是前提知道所修改元素的索引

如,修改队列的第一个元素为M

queue.queue[0] = "M"print("修改后的队列:", queue.queue)

(左右滑动查看完整代码)

果:




查找元素


在以上队列中,查找“B”元素是否存在

element_to_find = "B"if element_to_find in queue.queue:    print("元素", element_to_find, "存在!")else:    print("元素", element_to_find, "不存在!")

(左右滑动查看完整代码)

果:




删除元素


删除元素,使用的是get()和get_nowait()方法,它们都是指取出的是队列的元素,并将其从队列中移除。

get()方法,在队列为空时进行阻塞等待。面可带参数get(block=True, timeout=None)

    block参数block=True(默认值),则get()方法会阻塞直到有空间可用。如果block=False,则get()方法会立即引发_queue.Empty异常。

    timeout参数指定在队列为空并且block=True时的最长等待时间(以秒为单位)。如果在指定的时间内队列仍然是空的,则引发_queue.Empty异常。

如果只删除头部队列

removed_element = queue.get()print("本次删除的元素:", removed_element)print("当前队列剩下元素:", queue.queue)

(左右滑动查看完整代码)

果:




循环删除队列中的元素

其中使用到了queue.empty(),是指判断队列是否为空

while not queue.empty():    element = queue.get()    print("取出的元素并移除:", element)print("当前队列剩下元素:", queue.queue)

(左右滑动查看完整代码)

果:


此时,队列已经空,那么试一下

element = queue.get(block=True, timeout=10)print("queue中get()方法:", element)

等10秒钟,结果:



get_nowait()方法,只能用于非阻塞地获取队首元素,如果队列为空,它会立即引发Empty异常。类似于get(block=False),试一下:

element = queue.get_nowait()print("queue中get_nowait()方法:", element)

直接抛出异常:




Queue多线程使用


queue模块提供了一种简单而强大的机制来实现线程间的数据共享和同步操作,可以有效地处理多线程编程中的并发问题。


任务处理函数


首先,我们先定义一个任务处理的函数,实际使用的时候,这里面处理的是核心业务

def process_task(task):    # 处理任务的逻辑    print("Processing task:", task, "执行时间:", datetime.datetime.strftime(datetime.datetime.now(), "%Y-%m-%d %H:%M:%S"))

(左右滑动查看完整代码)



工作线程


其次,我们先定义一个线程工作的函数,实际使用的时候,这里面获取队列中的数据,调用以上任务处理的函数

其中使用到了queue.task_done(),是指表示前一个添加到队列的任务已完成

def worker():    while True:        # 从队列中获取任务        task = queue.get()        # 处理任务        process_task(task)        # 标记任务完成        queue.task_done()

(左右滑动查看完整代码)



创建队列并添加任务


本次,我们添加任务并存入到队列中,实际使用的时候,这队列里面的元素,是供给工作线程中,处理任务的调用参数。

其中使用到了queue.qsize(),是指当前队列中的元素个数

# 创建一个队列queue = Queue()tasks = ["task01", "task02", "task03", "task04", "task05", "task06", "task07", "task08", "task09", "task10", "task11",         "task12", "task13", "task14", "task15"]for task in tasks:    queue.put(task)print("添加到队列的任务:", queue.queue)print("队列中任务的数量:", queue.qsize())

(左右滑动查看完整代码)

结果:



多线程使用


最后,直接使用,看效果。直接使用都是直接调用上面封装好的工作线程worker()即可

其中使用到了queue.join(),指阻塞调用线程,直到队列中的所有任务都被处理完毕

第一:直接创建并启动工作线程

thread = threading.Thread(target=worker)thread.start()queue.join()print("所有任务执行完成!")

(左右滑动查看完整代码)

结果:


第二:自定义工作线程数并启动

如,本次指定5个线程,设置启动线程时间间隔为1秒

# 创建并启动5个工作线程num_threads = 5threads = []# 设置启动线程的时间间隔为1秒interval = 1for _ in range(num_threads):    thread = threading.Thread(target=worker)    thread.start()    threads.append(thread)    # 设置启动线程的时间间隔    time.sleep(interval)# 直到所有任务都完成queue.join()print("所有任务执行完成!")

(左右滑动查看完整代码)

结果:


可以看到,以上两个多线程使用方式,都是在同一秒执行任务,并发的效果相当明显。



总结

queue模块提供了一种简单而强大的机制来实现线程间的数据共享和同步操作,可以有效地处理多线程编程中的并发问题。

使用场景居多,不限于至服务开发、数据分析、爬虫、性能测试等。




寄语:世间三美,明月,清风,眼前……



看到这儿的朋友帮点个“”和“在看”,谢谢支持~!

     

文章就分享到这儿,喜欢就点个吧!




推荐阅读  点击标题可跳转


ISEE小栈
没有花里胡哨,简单才是王道。
 最新文章