Django中的并发一致性问题

  • pqdong 

1. Django中的多线程问题

简单的说就是服务端监听 socket 每次 accept 一个新的请求后,开一个线程处理这个socket 客户连接。这里可以参考知乎的一个回答:https://www.zhihu.com/question/56472691

2. 悲观锁,乐观锁

在web开发中真正保证数据一致性的操作就是对数据库的读写加锁,毕竟一致性问题的本质就在于对数据的操作上。

须要用到锁机制, 而MySQL中的锁如果按不同的粒度或者不同的级别来划分的话, 会得到不同的结果.
如果按锁粒度来分的话, 会有表级索行级锁页级锁
如果按锁级别来划分的话, 会有共享锁排它锁.

  • 共享锁可以称之为读锁, 为读取操作时创建的锁. 可以并发读, 但是不允许任何事务对数据进行修改, 除非读操作结束, 释放了共享锁. 使用SELECT ... LOCK IN SHARE MODE;来实现读锁, 这样一来所有的线程度到的数据是一致的.
  • 排它锁可以称之为写锁, 如果某个事务对数据加上了排它锁, 那么所有的事务都不能再在这些数据上添加任何的锁, 直到前一个事务结束. 使用SELECT ... FOR UPDATE;语句来实现.

在处理并发数据一致性的问题时, 常常会以使用方式来划分, 即乐观锁悲观锁.

悲观锁:在对数据修改之前, 首先会对该数据加上排它锁, 然后修改数据, 事务结束时释放锁. 如果加锁失败了的话, 说明有其他的事务对该数据进行了加锁操作, 此时可以等待, 也可以抛出异常, 具体的响应方式由开发人员决定.

乐观锁:在数据进行提交更新的时候,才会正式对数据的冲突与否进行检测,如果发现冲突了,则让返回用户错误的信息,让用户决定如何去做.

另外一个就是Innodb中默认为行锁, 像like这种查询是扫全表的, 不管加没加索引都是全表扫描.

# 事务1
begin;
SELECT * FROM auth_user where username like "%3%" for update;

# 事务2 
begin;
SELECT * FROM auth_user where id = 1 for update;

这两个悲观锁是有冲突的, 因为事务1实际上是锁了整张表; 另一一个就是如果where语句中的字段没有加索引的话, 本身MySQL也会扫全表进行匹配. 并且, 如果这这个上面儿添加了悲观锁, 也会锁全表.

# 事务1
begin;
SELECT * FROM auth_user where date_joined = "2018-09-10 02:30:28.104106" for update;

# 事务2
begin;
SELECT * FROM auth_user where date_joined = "2018-09-05 07:12:14.790511" for update;

这两个事务的date_joined完全不同, 但是其中一个事务还是被阻塞了. 所以, SELECT ... FOR UPDATE;必须要走索引, 锁全表的代码实在是太大.

# 事务1
begin;
SELECT * FROM auth_user where id = 1 for update;

# 事务2
begin;
SELECT * FROM auth_user where id = 2 for update;

这样一来两个事务的数据互不相干, 对并发的效率影响不会太大. 总之不能锁全表.

1.2 乐观锁

乐观锁比较乐观, 不会先把资源攥在手里面儿. 所以在数据进行提交更新的时候,才会正式对数据的冲突与否进行检测,如果发现冲突了,则让返回用户错误的信息,让用户决定如何去做.

乐观锁并不需要数据库提供额外的支持, 通常是由业务层通过逻辑的手段来进行控制. 在数据库内部update同一行的时候是不允许并发的,即数据库每次执行一条update语句时会获取被update行的写锁,直到这一行被成功更新后才释放. 因此在业务操作进行前获取需要锁的数据的当前版本号,然后实际更新数据时再次对比版本号确认与之前获取的相同,并更新版本号,即可确认这之间没有发生并发的修改. 如果更新失败即可认为老版本的数据已经被并发修改掉而不存在了,此时认为获取锁失败,需要回滚整个业务操作并可根据需要重试整个过程.

也就是说, 乐观锁的控制并不需要在事务中实现. 来看一下无锁的并发数据问题:

Alt text

为什么要在update之前先select呢? 业务需要.
很明显的第二个select语句所持有的数据在线程A执行完update语句之后为旧数据, 以旧数据来做更新很有可能出现问题. 解决这个问题的办法就是上面所说的乐观锁: 每次更新时都将库中的version数据+1, 另一个线程在做执行update语句之前首先检查自己所持有的version数据与库中的version数据是否相同, 若相同才进行更新. 不相同的话重新进行select.
那有没有可能确认了版本号之后, 刚准备执行更新操作的时候另外一个线程更新了数据库? 这种情况完全有可能出现, 这个也是乐观锁的局限性. 通常来讲我们并不会单独的使用version版本控制, 而是与事务一起使用.
如果把上面的两个线程执行操作放到两个事务中进行执行的话, 是不会出现上述问题的.

1.3 并发锁小结
  • 悲观锁: 用于保证数据的强一致性, 但并发性能可能会有一些影响.
  • 乐观锁: 用于取锁失败概率比较低的场景, 也就是说该数据为读多写少, 能够提升并发性能.

在并发这个话题下, 没有两全其美的技术, 既能够保证并发的效率, 同时也能够保证数据的强一致性. 只能根据自身的业务场景, 来选择最佳解决方案.

2. Django处理并发数据

对于读操作而言, MySQL默认使用快照读, 如果使用select...for update读, 会比较影响效率. 所以一般的读操作直接放过.

对于更新的操作. 要么直接使用update进行更新, 要么使用select..for update获取排它锁.

2.1 update直接更新

语法也比较简单, 以Django中更新用户密码为例:

User.object.filter(id=1).update(password=make_password(“123456”))

错误的做法:

user = User.objects.get(id=1)
user.password = make_password("123456")
user.save()

这样的写法在并发的情况下会代码无尽的麻烦, 虽然密码这种东西的确是不会有并发的操作, 但是商品的库存呢?

商品的减库存如果不用分布式锁的方式进行更新的话, 那么就需要用到F函数:

update product set store_number = store_number – 1 where id = 1;

这种更新需要用到原有的数据, 所以这个时候F函数就派上用场了.

from django.db.models import F  

Product.objects.filter(id=1).update(store_number=F("store_number") - 1)

来看一下这条更新所生成的SQL语句吧, 因为我这里没有Product模型, 以User模型为例好了:

User.objects.filter(id=1).update(first_name = F("first_name") + 1)

{'sql': 'UPDATE `auth_user` SET `first_name` = (`auth_user`.`first_name` + 1) WHERE `auth_user`.`id` = 1', 'time': '0.001'}

可以看到上述更新所执行的SQL是没有问题的. 执行的SQL语句可以使用connection.queries得到.

2.2 select..for update更新

select..for update必须在事务中执行, 否则没有任何意义. 而事务在Django中也有很好的封装:

from django.db import transaction

with transaction.atomic():
    do_something...

正式的编码实现:

from django.db import transaction

with transaction.atomic():
    # 调用该语句时id=1的数据首先会尝试获取排它锁, 若拿到了锁, 则继续执行; 没拿到, 阻塞等待.
    # 由于实在事务中获取的排它锁, 所以该锁会在事务提交之前一直存在, 保证该数据不会被修改, 因为其它的update语句拿不到排它锁.
    user = User.objects.select_for_update().get(id=1)
    user.password = make_password("123456")
    user.save()

想要验证上述代码没有的话用一个断点调试, 然后再在另一个进程中尝试修改同样的数据, 观察结果即可.
这样一来不管有多少个进程对同一个数据进行修改, 最终只能有一个成功, 并且使用select_for_update方法所获取的数据总是最新的.

3. 总结

数据在并发情况下的更新也就只有这么多, 具体选择哪种方式仍然需要针对自身的业务特性来选取: 只进行数据的更新, 直接使用update方式; 需要频繁的获取数据库中最新数据并更新, 则使用select..for update方式, 只不过此时的并发效率会有所降低, 毕竟持有排它锁的时间要比直接的update更长. 并且select..for update如果没有走索引的话, 将会锁全表, 从而带来毁灭性的延迟与阻塞.

4 .例子

from django.db import transaction

                '''悲观锁下订单(存在事务管理)'''
                class TradeCommitView(View):

                    @transaction.atomic   #装饰器的方法实现事务管理
                    def post(self,request):
                        '''设置保存点用于事务管理'''
                        sid1 = transaction.savepoint()

                        user = request.user
                        if not user.is_authenticated():
                            '''判断是否登陆'''
                            return redirect(reverse('car:index'))
                        # 接收数据
                        car_id = request.POST.get('car_id')
                        pay_method = request.POST.get('pay_style')
                        address_id = request.POST.get('address')
                        from datetime import datetime
                        order_id = str(user.id) + datetime.now().strftime('%Y%m%d%H%M%S')
                        # 运费(元)
                        transport = 5000
                        if not all([car_id,pay_method,address_id]):
                            return JsonResponse({'errmsg':'数据内容不完整'})
                        
                        try:
                            # car = CarDetail.objects.get(id=car_id)

                            #悲观锁
                            car = CarDetail.objects.select_for_update().get(id=car_id)
                        except:
                            transaction.savepoint_rollback(sid1)
                            return HttpResponse('车辆不存在')

                        service = float(car.car_price) * 10000 * 0.04
                        if service < 3000:
                            service = 3000
                       
                        add = AddressInfo.objects.get(id=address_id)

                        # sid1 = transaction.savepoint()

                        #创建订单
                        try:
                            order_new = OrderInfo.objects.create(
                                order_id = order_id,
                                user = user,
                                add = add,
                                price = car.car_price,
                                service_charge =service,
                                freight =transport,
                                # status = 0,
                                # pay_method =
                                online_pai_method = pay_method
                            )
                            import time
                            # time.sleep(30)
                            if car.status != 1:   #如果车辆不是上线状态
                                transaction.savepoint_rollback(sid1)   #回退到保存点
                                return JsonResponse({'errmsg':'下单失败'})
                            order_car =OrderCar.objects.create(
                                oder=order_new,
                                car_id = car,
                                comment='')
                            car.status = 0
                            car.save()

                        except Exception as e:
                            transaction.savepoint_rollback(sid1)
                            return JsonResponse({'errmsg':e})


                        transaction.savepoint_commit(sid1)
                        return HttpResponse('结束')
'''乐观锁下订单(存在事务管理)'''
        class TradeCommitView_le(View):

            @transaction.atomic
            def post(self, request):
                '''设置保存点用于事务管理'''
                sid1 = transaction.savepoint()

                user = request.user
                if not user.is_authenticated():
                    '''判断是否登陆'''
                    return redirect(reverse('car:index'))
                # 接收数据
                car_id = request.POST.get('car_id')
                pay_method = request.POST.get('pay_style')
                address_id = request.POST.get('address')
                from datetime import datetime
                order_id = str(user.id) + datetime.now().strftime('%Y%m%d%H%M%S')
                # 运费(元)
                transport = 5000
                if not all([car_id, pay_method, address_id]):
                    return HttpResponse('数据内容不完整')
                # 校验数据的正确(作业)
                try:
                    car = CarDetail.objects.get(id=car_id)

                except:
                    pass

                service = float(car.car_price) * 10000 * 0.04
                if service < 3000:
                    service = 3000
              
                add = AddressInfo.objects.get(id=address_id)

                # sid1 = transaction.savepoint()

                # 创建订单
                try:
                    order_new = OrderInfo.objects.create(
                        order_id=order_id,
                        user=user,
                        add=add,
                        price=car.car_price,
                        service_charge=service,
                        freight=transport,
                        # status = 0,
                        # pay_method =
                        online_pai_method=pay_method
                    )



                    #乐观锁。不是真正意思上的锁,只是在更新前查询,如果不符合条件就回滚,符合就继续执行。
                    res = CarDetail.objects.filter(id=car_id,status=1).update(status=0)
                    print('res',res)
                    if res == 0 :
                        transaction.savepoint_rollback(sid1)
                        return HttpResponse('车辆不存在')



                    order_car = OrderCar.objects.create(
                        oder=order_new,
                        car_id=car,
                        comment='')


                except Exception as e:
                    transaction.savepoint_rollback(sid1)
                    return JsonResponse({'errmsg': e})

                transaction.savepoint_commit(sid1)

                return render(request,'trade_pay.html',context={'order_id':order_id})

5.官方文档

select_for_update()

select_for_update(nowait=Falseskip_locked=Falseof=())

Returns a queryset that will lock rows until the end of the transaction, generating a SELECT ... FOR UPDATE SQL statement on supported databases.

For example:

entries = Entry.objects.select_for_update().filter(author=request.user)

All matched entries will be locked until the end of the transaction block, meaning that other transactions will be prevented from changing or acquiring locks on them.

Usually, if another transaction has already acquired a lock on one of the selected rows, the query will block until the lock is released. If this is not the behavior you want, call select_for_update(nowait=True). This will make the call non-blocking. If a conflicting lock is already acquired by another transaction, DatabaseError will be raised when the queryset is evaluated. You can also ignore locked rows by usingselect_for_update(skip_locked=True) instead. The nowait and skip_locked are mutually exclusive and attempts to callselect_for_update() with both options enabled will result in a ValueError.

By default, select_for_update() locks all rows that are selected by the query. For example, rows of related objects specified in select_related() are locked in addition to rows of the queryset’s model. If this isn’t desired, specify the related objects you want to lock in select_for_update(of=(...)) using the same fields syntax as select_related(). Use the value 'self' to refer to the queryset’s model.

You can’t use select_for_update() on nullable relations:

>>> Person.objects.select_related('hometown').select_for_update()
Traceback (most recent call last):
...
django.db.utils.NotSupportedError: FOR UPDATE cannot be applied to the nullable side of an outer join

To avoid that restriction, you can exclude null objects if you don’t care about them:

>>> Person.objects.select_related('hometown').select_for_update().exclude(hometown=None)
<QuerySet [<Person: ...)>, ...]>

Currently, the postgresqloracle, and mysql database backends support select_for_update(). However, MySQL doesn’t support the nowaitskip_locked, and of arguments.

Passing nowait=Trueskip_locked=True, or of to select_for_update() using database backends that do not support these options, such as MySQL, raises a NotSupportedError. This prevents code from unexpectedly blocking.

Evaluating a queryset with select_for_update() in autocommit mode on backends which support SELECT ... FOR UPDATE is aTransactionManagementError error because the rows are not locked in that case. If allowed, this would facilitate data corruption and could easily be caused by calling code that expects to be run in a transaction outside of one.

Using select_for_update() on backends which do not support SELECT ... FOR UPDATE (such as SQLite) will have no effect.SELECT ... FOR UPDATE will not be added to the query, and an error isn’t raised if select_for_update() is used in autocommit mode.

Warning

Although select_for_update() normally fails in autocommit mode, since TestCase automatically wraps each test in a transaction, calling select_for_update() in a TestCase even outside an atomic() block will (perhaps unexpectedly) pass without raising a TransactionManagementError. To properly test select_for_update() you should useTransactionTestCase.

Certain expressions may not be supported

PostgreSQL doesn’t support select_for_update() with Window expressions.

参考:SmartKeyerror的博客和sui_yi123的博客

https://smartkeyerror.com/
https://blog.csdn.net/sui_yi123/article/details/83187149

发表评论

电子邮件地址不会被公开。 必填项已用*标注