由于大多数程序不需要有多线程的能力,所以在Python启动的时候并不支持多线程。也就是说,Python中支持多线程所需要的数据结构特别是GIL并没有创建。当Python虚拟机启动的时候,多线程处理并没有打开,而仅仅支持单线程。只有当程序中使用了如thread.start_new_thread等方法的时候,python才知道需要有多线程处理的支持,此时,python虚拟机才会自动创建多线程处理所需要的数据结构与GIL。
生成和终止线程(由于thread模块比较低级,不被推荐使用,所以就不说了)
1 使用threading.Thread类(构造函数:Thread(group = None, target = None, name = None, args = (), kwargs = {})
使用threading模块来创建线程是很简单的。简单地说,只要继承threading.Thread,然后在__init__方法中,调用threading.Thread类的__init__方法,重写类的run方法就可以了。
import threadingclass MyThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): pass thread = MyThread()thread.start()
程序主要是创建一个继承自threading.Thread的类,然后实例化此对象,并调用相应的方法来生成线程。
这是一个简单的例子,大家可以看一下:
import threadingimport timeclass MyThread(threading.Thread): def __init__(self, index, create_time): threading.Thread.__init__(self) self.index = index self.create_time = create_time def run(self): time.sleep(1) print (time.time() - self.create_time), "\t", self.index print "Thread %d exit..." %(self.index)for index in range(5): thread = MyThread(index, time.time()) thread.start()print "main thread exit..."
main thread exit...1.0011651516 0Thread 0 exit...1.00121307373 1Thread 1 exit...1.00119805336 2Thread 2 exit...1.0011780262 3Thread 3 exit...1.00115704536 4Thread 4 exit...
threading中Thread类的常用方法
start() 开始运行生成的线程实例 run() 重载此方法,作为线程的运行部分 join() 等待线程的结束 is_alive() 如果线程是活动的,返回True,否则返回False name 线程名称。这个字符串用于唯一识别。 t.ident 整数线程标示符。如果线程尚未启动,他的值为None t.daemon 线程的布尔型后台标志,要在start()之前设置这个标志。 getName(老) 返回线程的名字 setName(老) 设置线程的名字 isAlive(老) 查看线程是否还是活动的 isDaemon(老) 返回线程的是否后台运行标志 setDaemon(老) 设置线程的后台运行标志现成的名字不但可以通过setName设置,还可以通过Thread类的__init__构造函数中设置。如果不设置线程的名字,系统将使用如Thread-N的名字
class MyThread(threading.Thread): def __init__(self, index, create_time, thread_name): threading.Thread.__init__(self, name = thread_name) self.index = index self.create_time = create_time
2 管理线程
在一个线程的生命周期中,会在不同的状态间切换。在任意时刻,线程总是处于某个线程状态中。一共四种状态;就绪状态/运行状态/休眠状态/终止状态
1)主线程对子线程的控制
在上面的例子中,打印信息“main thread exit...“一般出现在打印信息(如“Thread 4 exit..)之前。也就是说,在主线程生成了子线程之后 ,主线程将继续执行,而不会等待子线程的结束。但很多时候,需要等待子线程完成以后,主线程再继续执行。这可以通过join方法来实现
threads = []for index in range(5): thread = MyThread(index, time.time()) thread.start() threads.append(thread)for thread in threads: thread.join()print "main thread exit... "
1.00116205215 0Thread 0 exit...1.00103282928 3Thread 3 exit...1.00128912926 1Thread 1 exit...1.00128412247 2Thread 2 exit...1.00122499466 4Thread 4 exit...main thread exit...
join方法还有一个超时参数(timeout)。如果线程没有正常退出或者通过某个异常退出,且超时的情况下,主线程就不再等待子线程了。但是join只能返回None。因此当有超时参数的时候,无法判断线程是否结束。(isAlive)
使用join的时候,注意事项:
1)在超时参数不存在的情况下,join操作将会一直阻塞,知道线程终2)一个线程可以多次使用join方法3)线程不能在自己的运行代码中调用join方法,否则会发生死锁4)在线程调用start之前,调用join方法会发生错误2) 线程中的局部变量
有时候需要在每个线程中使用自己独立的变量。这时,就需要threading.local。
import threading import random, timeclass MyThread(): def __init__(self): self.local = threading.local() def run(self): time.sleep(random.random()) self.local.number = [] for i in range(10): self.local.number.append(random.choice(range(10))) print threading.currentThread(), self.local.numberthreadlocal = MyThread()threads = []for i in range(5): t = threading.Thread(target = threadlocal.run) t.start() threads.append(t)for i in range(5): threads[i].join
[3, 0, 6, 7, 7, 8, 6, 8, 1, 0] [6, 0, 6, 9, 1, 7, 3, 6, 1, 2] [1, 6, 6, 4, 6, 9, 7, 0, 2, 8] [6, 3, 5, 4, 6, 5, 3, 4, 2, 9] [3, 5, 7, 9, 3, 1, 4, 3, 9, 0]
3 线程间的同步
由于同一进程的所有线程都是共享数据的,如果对线程中数据的访问不加限制,后果将不可预期,甚至会造成死锁。为了解决这个问题,需要允许线程独占性的访问共享数据,这就是线程同步。注意,这些问题在进程中也是存在的,只是多线程环境更加常见,因此后面介绍的解决线程间据同步问题的方法,对进程也是适用的。常见的同步机制有四种:锁机制 信号量 条件变量 同步队列,每种机制都有其各自的优缺点,读者可以根据自己的需求做出选择。
1) 临界资源与临界区
临界资源指的是一次只允许一个线程访问的资源,包括打印机等硬件资源和互斥变量等一类的软件资源。对于临界资源的共享只能采取互斥的方式。也就是说,一个线程访问的时候,其他线程必须等待。且线程间不能交替使用该资源。一般的,线程中访问临界资源的代码部分叫做临界区。因此,临界区的代码不能同时执行。
import threadingimport timeclass Counter: def __init__(self): self.value = 0 def increment(self): self.value = self.value + 1 value = self.value return valuecounter = Counter()class ThreadDome(threading.Thread): def __init__(self, index, create_time): threading.Thread.__init__(self) self.index = index self.create_time = create_time def run(self): time.sleep(2) value = counter.increment() print (time.time() - self.create_time), "\t", self.index, "\tvalue:", valuefor index in range(100): thread = ThreadDome(index, time.time()) thread.start()
2.00218892097 0 value: 1 2.00226593018 1 value: 2 2.00224900246 2 value: 3 2.00223207474 3 value: 4 2.00221896172 4 value: 5 2.00221991539 5 value: 6 2.00221586227 6 value: 7 2.0022149086 7 value: 8 ...... 2.00189685822 89 value: 81 2.00181698799 90 value: 82 2.00173687935 91 value: 83 2.0016720295 92 value: 84 2.00155591965 94 value: 85 2.00128602982 96 value: 86 2.0012691021 97 value: 87 2.00124001503 98 value: 88 2.00123310089 99 value: 89 2.00206184387 93 value: 90 2.00195908546 95 value: 91
这里结果只给除了一小部分。对于产生的100个线程而言,计数器最终将到达100,但实际并非如此,代码运行多次,虽然结果不一定如上所示,但是有一个共同点,Count类的value基本没有达到100.这是因为没有对临界区进行互斥访问造成的。为了能够对临界区加以区别对待,需要在原来的代码中加入临界区的进入部分和离开不分。
在实际操作中,对于临界区的访问需要遵循如下一些访问原则: (1)空闲让进 (2)忙则等待 (3)有限等待 (4)让权等待2) 锁机制
数据之间同步最简单的方法就是使用锁机制。这是最底层的数据同步原语,一个锁总是处于已锁和未锁状态,因此提供两种操作:加锁解锁来改变锁的状态。对于一个锁来说,如果是未锁的状态,线程在进入临界区前将使用加锁操作将锁的状态变为已锁。当临界区代码执行完毕,可以使用解锁操作将锁状态变为未锁。如果需要使用临界资源的线程发现锁的状态为已锁的时候,必须阻塞等待,知道锁状态变为未锁。
上个例子代码中,临界区是:
self.value = self.value + 1 value = self.value
为解决这个问题,可将代码改为
import threadingimport timeclass Counter: def __init__(self): self.value = 0 self.lock = threading.Lock() def increment(self): self.lock.acquire() self.value = self.value + 1 value = self.value self.lock.release() return valuecounter = Counter()class ThreadDome(threading.Thread): def __init__(self, index, create_time): threading.Thread.__init__(self) self.index = index self.create_time = create_time def run(self): time.sleep(2) value = counter.increment() print (time.time() - self.create_time), "\t", self.index, "\tvalue:", valuefor index in range(100): thread = ThreadDome(index, time.time()) thread.start()
2.00218892097 0 value: 12.00224304199 1 value: 22.00222706795 2 value: 32.00221204758 3 value: 42.00219988823 4 value: 52.00220489502 5 value: 6 ........2.00217819214 92 value: 932.00214791298 93 value: 942.00211596489 94 value: 952.00210595131 95 value: 962.00208497047 96 value: 972.0018529892 99 value: 982.00212788582 97 value: 992.00206494331 98 value: 100
上面结果虽然也是无序的,但是value最终增加到了100.
3)条件变量
虽然锁机制可以解决一些数据同步问题,但是这只是最低层次的同步。当线程变多,且其关系复杂时候,就需要另一种同步机值---“条件变量“,使用这种机制,能够使的只有在特定的条件下,才能够对临界区进行访问。条件变量通过允许线程阻塞和等待线程发送信号的方式弥补了锁机制的不足。
python中的Condition类提供了这种机制的实现。由于条件变量同样支持锁机制,所以其中也提供了acquire和release方法。除此之外,还提供了wait,notify,notify_all等常用方法。调用wait方法将使线程处于阻塞状态,她有一个可选参数,用于指定超时时间,如果不指定超时时间,线程将一直阻塞,知道被notify或者notify_all唤醒。 生产者和消费者问题是一个非常著名和经典的同步问题。网上这种例子很多,大家动手查查,这里就不赘述了4) 信号量
信号量主要是用在需要对有限的资源进行同步的时候。信号量内部维护了对于资源的一个计算器,用来表示还可用的资源数。在python中,Semaphore类提供了这种同步机值的实现。类提供了acquire和release这两个方法。调用acquire方法的时候,如果内部计数器大于0,则将其减少1并返回。如果内部计数器等于0,则阻塞此线程,知道有线程使用release方法将内部计数器更新到大于1.而release方法就是将内部计数器+1.同时,如果有线程在等待资源,则将其唤醒。
一个简单的例子,假设资源的数目为5,则用如下语句构造信号量
max_resource = 5res_sema = Semaphore(value = max_resource)
当有多线程需要使用此资源时,则在每次使用资源的前后加上对信号量的操作:
res_sema.acquire()#使用此资源res_sema.release()
5) 同步队列
尽管threading模块提供了上面这些同步机制,但是在实践中,最容易处理的是同步队列Queue。这是专门为多线程访问所设计的数据结构,能够有效实现线程对资源的访问。
Queue模块中有一个Queue类。其构造函数中可以指定一个maxsize值,当maxsize值小于或等于0的时候,表示对队列的长度没有限制。当大与0的时候,则制定了队列的长度。当队列的长度达到最大值而又有新线程要加入队列的时候,需要等待。Queue有很多方法,但是比较重要的还是put和get方法。put方法将需要完成的任务放入队列。而get方法则是从队列中获取任务。Queue模块定义了3中不同的队列类:
(1) Queue([maxsize]) 创建一个FIFO队列,maxsize是对列可以放入的最大项目数 ,如果省略maxsize参数或将他置为0,队列大小将无限大(2) LifoQueue([maxsize]) 创建一个LIFO队列(栈)(3) PriorityQueue([maxsize]) 创建一个优先队列,其中项目按照优先级排好。使用这个队列时,项目应该是(priority, data)形式的元组,其中priority是数字。队列类的实例q有以下方法:
q.qsize()q.empty()q.full()q.put(itm[, block[, timeout]]): 将item放入队列,如果block参数为Ture(默认),调用者将被阻塞知道对列中出现可用的空闲位置为止。否则(False)队列满时,将引发Full异常。timeout提供可选的超时值。如果出现超时,引发Full异常q.get([block[, timeout]])(用法类似上面) 从对列中删除一项,然后返回这个项目。如果可选参数block为True(默认值),调用者将阻塞,直到队列中有可用数据。否则(False),队列为空时,将引发异常Empty。timeout提供可选的超时值,单位为秒,当超时时,将引发Empty异常。q.put_nowait(item)=q.put(item, False)q.get_nowait() = q.get(0)q.task_done():队列中数据的使用者用来只是对于项目的处理已经结束。如果使用此方法,那么从对列中删除的每一项都应该调用一次。q.join():阻塞直到队列中所有项目均被删除或者处理为止。一旦队列中的每一项都调用一次q.task_done()方法,此方法将会直接返回。import threading, Queue, time , randomclass Worker(threading.Thread): def __init__(self, index, queue): threading.Thread.__init__(self) self.index = index self.queue = queue def run(self): while 1: time.sleep(random.random()) item = self.queue.get() if item is None: break print "index:', self.index,"task", item, "finished" self.queue.task_done()queue = Queue.Queue(0)for i in range(2): Worker(i, queue).start()for i in range(10): queue.put(i)for i in range(2): queue.put(None)
index: 1 task 0 finishedindex: 1 task 1 finishedindex: 1 task 2 finishedindex: 0 task 3 finishedindex: 1 task 4 finishedindex: 0 task 5 finishedindex: 0 task 6 finishedindex: 0 task 7 finishedindex: 0 task 8 finishedindex: 1 task 9 finished
多线程的数据同步管理非常复杂,特别是进程较多和关系错乱的情况下,所以在管理比较多的线程的时候,可以采用一些已有的框架,如Twisted等。在实现多线程的时候,最好能权衡以下,由此带来的性能提升和复杂性所带来的维护开销。
Ps:这些事我看的《Python 开发技术详解》(周伟等)的一些记录,希望对大家有帮助,如有不详细的,查阅此书