python解决线程同步方案有哪些
admin
2023-07-15 07:02:05
0
 先提到线程同步是个什么,概念是什么,就是线程通讯中通过使用某种技术访问数据时,而一旦此线程访问到,其他线程也就不能访问了,直到该线程对数据完成操作才结束。
     Event事件是一种实现方式:通过内部的标记看看是不是变化,也就是true or false了,
     将set(),clear(),is_set(),为true,wait(timeout=None)此种设置true的时长,等到返回false,不等到超时返回false,无线等待为None,

     来看一个wait的使用:
from threading import Event, Thread
import logging

logging.basicConfig(level=logging.INFO)

def A(event:Event, interval:int):
    while not event.wait(interval): # 要么true  or false
        logging.info('hello')

e = Event()
Thread(target=A, args=(e, 3)).start()

e.wait(8)
e.set()
print('end--------------')

使用锁Lock解决数据资源在争抢,从而使资源有效利用。
lock的方法:
acquire(blocking=True,timeout=-1),默认阻塞,阻塞设置超时时间,非阻塞,timeout禁止使用,成功获取锁,返回True,否则False。
有阻塞就有释放 ,解开锁,release(),从线程释放锁,上锁的锁重置为unloced未上锁调用,抛出RuntimeError异常。

import threading
from threading import Thread, Lock
import logging
import time

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'

logging.basicConfig(format=FORMAT, level=logging.INFO)


cups = []
lock = Lock()

def worker(count=10):
    logging.info("I'm working for U.")
    flag = False
    while True:
    lock.acquire() # 获取锁

    if len(cups) >= count:
        flag = True

    time.sleep(0.0001) 
    if not flag:
        cups.append(1)

    if flag:
        break

logging.info('I finished. cups = {}'.format(len(cups)))

for _ in range(10):
    Thread(target=worker, args=(1000,)).start()

使用锁的过程中,总是不经意加上锁,出现死锁的产生,出现了死锁,如何解决呢?
使用try finally 将锁释放,另一种使用with上下文管理。
锁的使用场景在于应该少用锁,还要就是如若上锁,将锁的使用时间缩短,避免时间太长而出现无法释放锁的结果。

可重入锁Lock,


import threading
import time

lock = threading.RLock()
print(lock.acquire())
print('------------')
print(lock.acquire(blocking=False))
print(lock.acquire())
print(lock.acquire(timeout=3.55))
print(lock.acquire(blocking=False))
#print(lock.acquire(blocking=False, timeout=10)) # 异常
lock.release()
lock.release()
lock.release()
lock.release()
print('main thread {}'.format(threading.current_thread().ident))
print("lock in main thread {}".format(lock)) # 注意观察lock对象的信息
lock.release()
#lock.release() #多了一次
print('===============')
print()

print(lock.acquire(blocking=False)) # 1次
#threading.Timer(3, lambda x:x.release(), args=(lock,)).start() # 跨线程了,异常
lock.release()
print('~~~~~~~~~~~~~~~~~')
print()

# 测试多线程
print(lock.acquire())

def sub(l):
    print('{}: {}'.format(threading.current_thread(), l.acquire())) # 阻塞
    print('{}: {}'.format(threading.current_thread(), l.acquire(False)))
    print('lock in sub thread {}'.format(lock))
    l.release()
    print('sub 1')
    l.release()
    print('sub 2')
    # l.release() # 多了一次

threading.Timer(2, sub, args=(lock,)).start() # 传入同一个lock对象
print('++++++++++++++++++++++')
print()

print(lock.acquire())

lock.release()
time.sleep(5)
print('释放主线程锁')
lock.release()

使用构造方法Condition(lock=None),默认是Rloc,
具体方法为;
acquire(*args),获取锁
wait(self,timeout=None),等待或超时
notify(n=1),唤醒线程,没有等待就没有任何操作,指线程
notify_all(),唤醒所有等待的线程。
Condition主要用于生产者和消费者模型中,解决匹配的问题。
使用方式:先获取acquire,使用完了要释放release,避免死锁最好使用with上下文;生产者和消费者可以使用notify and notify_all。

如下例子:

from threading import Thread, Event
import logging
import random

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)

## 此例只是为了演示,不考虑线程安全问题

class Dispatcher:
    def __init__(self):
        self.data = None
        self.event = Event() # event只是为了使用方便,与逻辑无关

    def produce(self, total):
        for _ in range(total):
            data = random.randint(0,100)
            logging.info(data)
            self.data = data
            self.event.wait(1)
            self.event.set()

    def consume(self):
        while not self.event.is_set():
            data = self.data
            logging.info("recieved {}".format(data))
            self.data = None
            self.event.wait(0.5)

d = Dispatcher()
p = Thread(target=d.produce, args=(10,), name='producer')
c = Thread(target=d.consume, name='consumer')
c.start()
p.start()

这里代码会有缺陷:优化如下:

from threading import Thread, Event, Condition
import logging
import random

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)

## 此例只是为了演示,不考虑线程安全问题

class Dispatcher:
    def __init__(self):
        self.data = None
        self.event = Event() # event只是为了使用方便,与逻辑无关
        self.cond = Condition()

    def produce(self, total):
        for _ in range(total):
            data = random.randint(0,100)
            with self.cond:
                logging.info(data)
                self.data = data
                self.cond.notify_all()
                self.event.wait(1) # 模拟产生数据速度
                self.event.set()

    def consume(self):
        while not self.event.is_set():
            with self.cond:
                self.cond.wait() # 阻塞等通知
                logging.info("received {}".format(self.data))
            self.event.wait(0.5) # 模拟消费的速度

d = Dispatcher()
p = Thread(target=d.produce, args=(10,), name='producer')
# 增加消费者

for i in range(5):
    c = Thread(target=d.consume, name='consumer-{}'.format(i))
    c.start()
p.start()

Barrier的使用:
方法如下:
Barrier(parties, action=None,
timeout=None),构建barrier对象,timeout未指定的默认值;
n_waiting ,当前barrier等待的线程数。;
parties ,需要等待
wait(timeout=None),wait方法设置超时并超时发送,barrie处于broken状态。
而broken的状态方法:
broken,打开状态,返回true;
abort(),barrier在broken状态中,wait等待的线程会抛出BrokenBarrierError异常,直到reset恢复barrier;
reset(),恢复barrier,重新开始拦截。
barrier不做演示:

还有semaphore信号量,每次acquire时,都会减一,到0时的线程再到release后,大于0,恢复阻塞的线程。
方法:
Semaphore(value=1) 构造方法,alue小于0,抛ValueError异常;
acquire(blocking=True, timeout=None) 获取信号量,计数器减1,获取成功返回True;
release() 释放信号量,计数器加1。
使用信号量处理时,需要注意release超界问题,边界问题,其实,在使用中,python有GIL的存在,有的多线程就变成线程安全的,注意一点,但实际上它们并不是线程安全类型。因此我们在使用中要具体场景具体分析具体使用。

相关内容

热门资讯

特朗普:正致力于与伊朗达成协议... 特朗普在《纽约邮报》一档播客访谈节目中称,他正与伊朗磋商一项协议,伊朗已同意不再谋求拥有核武器。他表...
不接壤的日菲为何偷划海界? 日菲近日发表联合声明,宣称就“划定两国专属经济区和大陆架的海洋边界”启动正式谈判。两个隔海相望的国家...
凤凰晚报丨从钳工到老戏骨,魏宗... 今日人物【从钳工到老戏骨,魏宗万用一生诠释“戏比天大”】6月1日,表演艺术家魏宗万在上海逝世,享年8...
科威特称伊朗袭击致63人受伤 科威特卫生部门3日称,伊朗当天对科威特的袭击已造成63人受伤,相关部门已启动紧急应对预案,并在全国范...
日本标榜“和平国家”却行扩军备... 今年是东京审判开庭80周年,世界正回望历史、反思战争罪责、捍卫二战后来之不易的国际秩序之际,日本却迈...
浙江杨梅即将大规模上市,如何破... “我们现在的压力很大。”5月底,浙江余姚杨梅产区丈亭镇副镇长林宇站在一片杨梅林前对第一财经表示,当地...
致5死2伤!韩国就韩华航空航天... 【环球网报道 记者 姜蔼玲】据韩联社6月1日报道,针对位于韩国大田的韩华航空航天公司发生爆炸致7人伤...
黄河科技学院2026年招生简章 长按图片识别二维码或点击 “阅读原文” 查看电子招生简章。
医路起航,从“心” 开始!黄河... 6月1日上午,黄河科技学院附属医院2022级临床医学本科实习生入院岗前培训在大医讲堂顺利举办。院领导...
问题居然在实体卡槽上!美版iP... 6月2日消息,日前,又有博主提前把还没发布的iPhone 18 Pro电池参数给曝光了出来,根据爆料...