当前位置:脚本大全 > > 正文

python中的多线程详解(python多线程抽象编程模型详解)

时间:2022-01-19 00:57:49类别:脚本大全

python中的多线程详解

python多线程抽象编程模型详解

最近需要完成一个多线程下载的工具,对其中的多线程下载进行了一个抽象,可以对所有需要使用到多线程编程的地方统一使用这个模型来进行编写。

主要结构:

1、基于Queue标准库实现了一个类似线程池的工具,用户指定提交任务线程submitter与工作线程worker数目,所有线程分别设置为后台运行,提供等待线程运行完成的接口。

2、所有需要完成的任务抽象成task,提供单独的无参数调用方式,供worker线程调用;task以生成器的方式作为参数提供,供submitter调用。

3、所有需要进行线程交互的信息放在context类中。

主要实现代码如下:

  • ?
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • #Submitter线程类实现,主要是`task_generator`调用
  • class SubmitterThread(threading.Thread):
  •   _DEFAULT_WAIT_TIMEOUT = 2 #seconds
  •   def __init__(self, queue, task_gen, timeout=2):
  •     super(SubmitterThread, self).__init__()
  •     self.queue = queue
  •     if not isinstance(timeout, int):
  •       _logger.error('Thread wait timeout value error: %s, '
  •              'use default instead.' % timeout)
  •       self.timeout = self._DEFAULT_WAIT_TIMEOUT
  •     self.timeout = timeout
  •     self.task_generator = task_gen
  •  
  •   def run(self):
  •     while True:
  •       try:
  •         task = self.task_generator.next()
  •         self.queue.put(task, True, self.timeout)
  •       except Queue.Full:
  •         _logger.debug('Task queue is full. %s wait %d second%s timeout' %
  •                (self.name, self.timeout, 's' if (self.timeout > 1) else ''))
  •         break
  •       except (StopIteration, ValueError) as e:
  •         _logger.debug('Task finished')
  •         break
  • ?
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • #Worker线程实现,主要就是try块内的func调用
  • class WorkerThread(threading.Thread):
  •   _DEFAULT_WAIT_TIMEOUT = 2 #seconds
  •   def __init__(self, queue, timeout=2):
  •     super(WorkerThread, self).__init__()
  •     self.queue = queue
  •     if not isinstance(timeout, int):
  •       _logger.error('Thread wait timeout value error: %s, '
  •              'use default instead.' % timeout)
  •       self.timeout = self._DEFAULT_WAIT_TIMEOUT
  •     self.timeout = timeout
  •  
  •   def run(self):
  •     while True:
  •       try:
  •         func = self.queue.get(True, self.timeout)
  •       except Queue.Empty:
  •         _logger.debug('Task queue is empty. %s wait %d second%s timeout' %
  •                (self.name, self.timeout, 's' if (self.timeout > 1) else ''))
  •         break
  •  
  •       if not callable(func):
  •         time.sleep(1)
  •       try:
  •         func()
  •       except Exception as e:
  •         _logger.error('Thread %s running occurs error: %s' %
  •                (self.name, e))
  •         print('Thread running error: %s' % e)
  • ?
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • class Executor(object):
  •   """
  •   The really place to execute executor
  •   """
  •   thread_list = []
  •   submitters = 0
  •   workers = 0
  •   queue = None
  •   task_generator = None
  •   timeout = 0
  •   def __init__(self, task_gen, submitters=1, workers=1 , timeout=2):
  •     if len(self.thread_list) != 0:
  •       raise RuntimeError('Executor can only instance once.')
  •     self.queue = Queue.Queue(maxsize=submitters * 2 + workers * 2)
  •     self.submitters = submitters
  •     self.workers = workers
  •     self.task_generator = task_gen
  •     self.timeout = timeout
  •  
  •   def start(self):
  •     for i in range(self.submitters):
  •       submitter = SubmitterThread(self.queue, self.task_generator, self.timeout)
  •       self.thread_list.append(submitter)
  •       submitter.setName('Submitter-%d' % i)
  •       submitter.setDaemon(True)
  •       submitter.start()
  •     for i in range(self.workers):
  •       worker = WorkerThread(self.queue, self.timeout)
  •       self.thread_list.append(worker)
  •       worker.setName('Worker-%d' % i)
  •       worker.setDaemon(True)
  •       worker.start()
  •  
  •   def is_alive(self):
  •     alive = False
  •     for t in self.thread_list:
  •       if t.isAlive():
  •         alive = True
  •         break
  •     return alive
  •  
  •   def wait_to_shutdown(self):
  •     _logger.debug('Start to wait to shutdown')
  •     for t in self.thread_list:
  •       t.join()
  •       _logger.debug('Shutdown thread : %s' % t.name)
  • Executor类保存了线程池,提供相应接口。有了这个抽象之后,只需要实例化Executor类的对象,然后调用start方法进行多线程任务的运行。并可以用is_alive等接口再主线程内进行其他处理。

    后续再使用这个抽象进行实际多线程任务的实现。

    以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持开心学习网。

    原文链接:https://blog.csdn.net/u010487568/article/details/52081875

    上一篇下一篇

    猜您喜欢

    热门推荐