当前位置:脚本大全 > > 正文

python分割字符串要用哪一个语句(python使用threading.Condition交替打印两个字符)

时间:2021-10-07 00:19:14类别:脚本大全

python分割字符串要用哪一个语句

python使用threading.Condition交替打印两个字符

python中使用threading.condition交替打印两个字符的程序。

这个程序涉及到两个线程的的协调问题,两个线程为了能够相互协调运行,必须持有一个共同的状态,通过这个状态来维护两个线程的执行,通过使用threading.condition对象就能够完成两个线程之间的这种协调工作。

threading.condition默认情况下会通过持有一个reentrantlock来协调线程之间的工作,所谓可重入锁,是只一个可以由一个线程递归获取的锁,此锁对象会维护当前锁的所有者(线程)和当前所有者递归获取锁的次数(本文在逻辑上和可重入锁没有任何关系,完全可以用一个普通锁替代)。

python文档中给出的描述是:它是一个与某个锁相联系的变量。同时它实现了上下文管理协议。其对象中除了acquire和release方法之外,其它方法的调用的前提是,当前线程必须是这个锁的所有者。

通过代码和其中的注释,能够非常明白地弄清楚condition的原理是怎样的:

  • ?
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • import threading
  • import time
  • import functools
  •  
  •  
  • def worker(cond, name):
  •  """worker running in different thread"""
  •  with cond: # 通过__enter__方法,获取cond对象中的锁,默认是一个reentrantlock对象
  •   print('...{}-{}-{}'.format(name, threading.current_thread().getname(), cond._is_owned()))
  •   cond.wait() # 创建一个新的锁newlock,调用acquire将newlock获取,然后将newlock放入等待列表中,\
  •   # 释放cond._lock锁(_release_save),最后再次调用acquire让newlock阻塞
  •  print('wait returned in {}'.format(name))
  •  
  •  
  • if __name__ == '__main__':
  •  condition = threading.condition()
  •  t1 = threading.thread(target=functools.partial(worker, condition, 't1'))
  •  t2 = threading.thread(target=functools.partial(worker, condition, 't2'))
  •  
  •  t2.start() # 启动线程2
  •  t1.start() # 启动线程1
  •  
  •  time.sleep(2)
  •  with condition:
  •   condition.notify(1) # 按照fifo顺序(wait调用顺序),释放一个锁,并将其从等待列表中删除
  •  
  •  time.sleep(2)
  •  
  •  with condition:
  •   condition.notify(1) # 按照fifo顺序(wait调用顺序),释放另一个锁,并将其从等待队列中删除
  •  
  •  t1.join() # 主线程等待子线程结束
  •  t2.join() # 主线程等待子线程结束
  •  
  •  print('all done')
  • 其输出为:

  • ?
  • 1
  • 2
  • 3
  • 4
  • 5
  • ...t2-thread-2-true
  • ...t1-thread-1-true
  • wait returned in t2
  • wait returned in t1
  • all done
  • 其中wait方法要求获取到threading.condition对象中的锁(如果没有提供,默认使用一个可重入锁),然后自己创建一个新的普通锁(newlock),并获取这个newlock;之后调用_release_save方法释放threading.condition对象中的锁,让其它线程能够获取到;最后再次调用newlock上的acquire方法,由于在创建时已经acquire过,所以此线程会阻塞在此。而wait想要继续执行,必须等待其它线程将产生阻塞的这个newlock给release掉,当然,这就是notify方法的责任了。

    notify方法接收一个数字n,从等待列表中取出相应数量的等待对象(让wait方法阻塞的锁对象),调用其release方法,让对应的wait方法能够返回。而notify_all方法仅仅就是将n设置为等待列表的总长度而已。

    在理解了threading.condition对象中wait和notify的工作原理之后,我们就可以利用它们来实现两个线程交替打印字符的功能了:

  • ?
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • import threading
  • import functools
  • import time
  •  
  •  
  • def print_a(state):
  •  while true:
  •   if state.closed:
  •    print('close a')
  •    return
  •   print('a')
  •   time.sleep(2)
  •   state.set_current_is_a(true)
  •   state.wait_for_b()
  •  
  •  
  • def print_b(state):
  •  while true:
  •   if state.closed:
  •    print('close b')
  •    return
  •   state.wait_for_a()
  •   print('b')
  •   time.sleep(2)
  •   state.set_current_is_a(false)
  •  
  •  
  • if __name__ == '__main__':
  •  class state(object):
  •   """state used to coordinate multiple(two here) threads"""
  •   def __init__(self):
  •    self.condition = threading.condition()
  •    self.current_is_a = false
  •    self.closed = false
  •  
  •   def wait_for_a(self):
  •    with self.condition:
  •     while not self.current_is_a:
  •      self.condition.wait()
  •  
  •   def wait_for_b(self):
  •    with self.condition:
  •     while self.current_is_a:
  •      self.condition.wait()
  •  
  •   def set_current_is_a(self, flag):
  •    self.current_is_a = flag
  •    with self.condition:
  •     self.condition.notify_all()
  •  
  •  
  •  state = state()
  •  t1 = threading.thread(target=functools.partial(print_a, state))
  •  t2 = threading.thread(target=functools.partial(print_b, state))
  •  
  •  try:
  •   t1.start()
  •   t2.start()
  •  
  •   t1.join()
  •   t2.join()
  •  except keyboardinterrupt:
  •   state.closed = true
  •   print('closed')
  • 可以看到有两种类型的任务,一个用于打印字符a,一个用于打印字符b,我们的实现种让a先于b打印,所以在print_a中,先打印a,再设置当前字符状态并释放等待列表中的所有锁(set_current_is_a),如果没有这一步,current_is_a将一直是false,wait_for_b能够返回,而wait_for_a却永远不会返回,最终效果就是每隔两秒就打印一个字符a,而b永远不会打印。另一个副作用是如果wait_for_a永远不会返回,那print_b所在线程的关闭逻辑也就无法执行,最终会成为僵尸线程(这里的关闭逻辑只用作示例,生产环境需要更加完善的关闭机制)。

    考虑另一种情况,print_a种将set_current_is_a和wait_for_b交换一下位置会怎么样。从观察到的输出我们看到,程序首先输出了一个字符a,以后,每隔2秒钟,就会同时输出a和b,而不是交替输出。原因在于,由于current_is_a还是false,我们先调用的wait_for_b其会立即返回,之后调用set_current_is_a,将current_is_a设置为true,并释放所有的阻塞wait的锁(notify_all),这个过程中没有阻塞,print_a紧接着进入了下一个打印循环;与此同时,print_b中的wait_for_a也返回了,进入到b的打印循环,故最终我们看到a和b总是一起打印。

    可见对于threading.condition的使用需要多加小心,要注意逻辑上的严谨性。

    附一个队列版本:

  • ?
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • import threading
  • import functools
  • import time
  • from queue import queue
  •  
  •  
  • def print_a(q_a, q_b):
  •  while true:
  •   char_a = q_a.get()
  •   if char_a == 'closed':
  •    return
  •   print(char_a)
  •   time.sleep(2)
  •   q_b.put('b')
  •  
  •  
  • def print_b(q_a, q_b):
  •  while true:
  •   char_b = q_b.get()
  •   if char_b == 'closed':
  •    return
  •   print(char_b)
  •   time.sleep(2)
  •   q_a.put('a')
  •  
  •  
  • if __name__ == '__main__':
  •  q_a = queue()
  •  q_b = queue()
  •  
  •  t1 = threading.thread(target=functools.partial(print_a, q_a, q_b))
  •  t2 = threading.thread(target=functools.partial(print_b, q_a, q_b))
  •  
  •  try:
  •   t1.start()
  •   t2.start()
  •  
  •   q_a.put('a')
  •  
  •   t1.join()
  •   t2.join()
  •  except keyboardinterrupt:
  •   q_a.put('closed')
  •   q_b.put('closed')
  •  
  •  print('done')
  • 队列版本逻辑更清晰,更不容易出错,实际应用中应该选用队列。 

    附一个协程版本(python 3.5+):

  • ?
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • import time
  • import asyncio
  •  
  •  
  • async def print_a():
  •  while true:
  •   print('a')
  •   time.sleep(2) # simulate the cpu block time
  •   await asyncio.sleep(0) # release control to event loop
  •  
  •  
  • async def print_b():
  •  while true:
  •   print('b')
  •   time.sleep(2) # simulate the cpu block time
  •   await asyncio.sleep(0) # release control to event loop
  •  
  •  
  • async def main():
  •  await asyncio.wait([print_a(), print_b()])
  •  
  •  
  • if __name__ == '__main__':
  •  loop = asyncio.get_event_loop()
  •  loop.run_until_complete(main())
  • 协程的运行需要依附于一个事件循环(select/poll/epoll/kqueue),通过async def将一个函数定义为协程,通过await主动让渡控制权,通过相互让渡控制权完成交替打印字符。整个程序运行于一个线程中,这样就没有线程间协调的工作,仅仅是控制权的让渡逻辑。对于io密集型操作,而没有明显的cpu阻塞(计算复杂,以致出现明显的延时,比如复杂加解密算法)的情况下非常合适。

    附一个java版本:

    printmain类,用于管理和协调打印a和打印b的两个线程:

  • ?
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • package com.cuttyfox.tests.self.version1;
  •  
  • import java.util.concurrent.executorservice;
  • import java.util.concurrent.executors;
  • import java.util.concurrent.timeunit;
  •  
  • public class printmain {
  •  private boolean currentisa = false;
  •  
  •  public synchronized void waitingforprintinga() throws interruptedexception {
  •   while (this.currentisa == false) {
  •    wait();
  •   }
  •  }
  •  
  •  public synchronized void waitingforprintingb() throws interruptedexception {
  •   while (this.currentisa == true) {
  •    wait();
  •   }
  •  }
  •  
  •  public synchronized void setcurrentisa(boolean flag) {
  •   this.currentisa = flag;
  •   notifyall();
  •  }
  •  
  •  public static void main(string[] args) throws exception {
  •   printmain state = new printmain();
  •   executorservice executorservice = executors.newcachedthreadpool();
  •   executorservice.execute(new printb(state));
  •   executorservice.execute(new printa(state));
  •   executorservice.shutdown();
  •   executorservice.awaittermination(10, timeunit.seconds);
  •   system.out.println("done");
  •   system.exit(0);
  •  }
  • }
  • 打印a的线程(首先打印a):

  • ?
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • package com.cuttyfox.tests.self.version1;
  •  
  • import java.util.concurrent.timeunit;
  •  
  • public class printa implements runnable{
  •  private printmain state;
  •  
  •  public printa(printmain state) {
  •   this.state 标签:

    猜您喜欢