博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
redis分布式锁小试
阅读量:6974 次
发布时间:2019-06-27

本文共 8121 字,大约阅读时间需要 27 分钟。

一、场景

  项目A监听mq中的其他项目的部署消息(包括push_seq, status, environment,timestamp等),然后将部署消息同步到数据库中(项目X在对应环境[environment]上部署的push_seq[项目X的版本])。那么问题来了,mq中加入包含了两个部署消息 dm1 和 dm2,dm2的push_seq > dm1的push_seq,在分布式的情况下,dm1 和 dm2可能会分别被消费(也就是并行),那么在同步数据库的时候可能会发生 dm1 的数据保存 后于 dm2的数据保存,导致保存项目的部署信息发生异常。

二、解决思路

  将mq消息的并行消费变成串行消费,这里借助redis分布式锁来完成。同一个服务在分布式的状态下,监听到mq消息后,触发方法的执行,执行之前(通过spring aop around来做的)首先获得redis的一个分布式锁,获取锁成功之后才能执行相关的逻辑以及数据库的保存,最后释放锁。

三、主要代码

import java.lang.annotation.ElementType;import java.lang.annotation.Inherited;import java.lang.annotation.Retention;import java.lang.annotation.RetentionPolicy;import java.lang.annotation.Target;/*** @author: hujunzheng* @create: 17/9/29 下午2:49*/@Target({ElementType.METHOD})@Retention(RetentionPolicy.RUNTIME)@Inheritedpublic @interface RedisLock {    /**     * redis的key     * @return     */    String value();    /**     * 持锁时间,单位毫秒,默认一分钟     */    long keepMills() default 60000;    /**     * 当获取失败时候动作     */    LockFailAction action() default LockFailAction.GIVEUP;        public enum LockFailAction{        /**         * 放弃         */        GIVEUP,        /**         * 继续         */        CONTINUE;    }    /**     * 睡眠时间,设置GIVEUP忽略此项     * @return     */    long sleepMills() default 500;}

 

import java.lang.reflect.Method;import org.aspectj.lang.ProceedingJoinPoint;import org.aspectj.lang.annotation.Around;import org.aspectj.lang.annotation.Aspect;import org.aspectj.lang.annotation.Pointcut;import org.aspectj.lang.reflect.MethodSignature;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;/*** @author: hujunzheng* @create: 17/9/29 下午2:49*/@Component@Aspectpublic class RedisLockAspect {    private static final Log log = LogFactory.getLog(RedisLockAspect.class);    @Autowired    private RedisCacheTemplate.RedisLockOperation redisLockOperation;    @Pointcut("execution(* com.hjzgg..StargateDeployMessageConsumer.consumeStargateDeployMessage(..))" +            "&& @annotation(me.ele.api.portal.service.redis.RedisLock)")    private void lockPoint(){}    @Around("lockPoint()")    public Object arround(ProceedingJoinPoint pjp) throws Throwable{        MethodSignature methodSignature = (MethodSignature) pjp.getSignature();        Method method = methodSignature.getMethod();        RedisLock lockInfo = method.getAnnotation(RedisLock.class);             /*         String lockKey = lockInfo.value();         if (method.getParameters().length == 1 && pjp.getArgs()[0] instanceof DeployMessage) {            DeployMessage deployMessage = (DeployMessage) pjp.getArgs()[0];            lockKey += deployMessage.getEnv();            System.out.println(lockKey);         }     */        boolean lock = false;        Object obj = null;        while(!lock){            long timestamp = System.currentTimeMillis()+lockInfo.keepMills();            lock = setNX(lockInfo.value(), timestamp);            //得到锁,已过期并且成功设置后旧的时间戳依然是过期的,可以认为获取到了锁(成功设置防止锁竞争)            long now = System.currentTimeMillis();            if(lock || ((now > getLock(lockInfo.value())) && (now > getSet(lockInfo.value(), timestamp)))){                log.info("得到redis分布式锁...");                obj = pjp.proceed();                if(lockInfo.action().equals(RedisLock.LockFailAction.CONTINUE)){                    releaseLock(lockInfo.value());                }            }else{                if(lockInfo.action().equals(RedisLock.LockFailAction.CONTINUE)){                    log.info("稍后重新请求redis分布式锁...");                    Thread.currentThread().sleep(lockInfo.sleepMills());                }else{                    log.info("放弃redis分布式锁...");                    break;                }            }        }        return obj;    }    private boolean setNX(String key,Long value){        return redisLockOperation.setNX(key, value);    }    private long getLock(String key){        return redisLockOperation.get(key);    }    private Long getSet(String key,Long value){        return redisLockOperation.getSet(key, value);    }    private void releaseLock(String key){        redisLockOperation.delete(key);    }    @Pointcut(value = "execution(* me.ele..StargateBuildMessageConsumer.consumeStargateBuildMessage(me.ele.api.portal.service.mq.dto.BuildMessage)) && args(buildMessage)" +            "&& @annotation(me.ele.api.portal.service.redis.RedisLock)", argNames = "buildMessage")    private void buildMessageLockPoint(BuildMessage buildMessage){}    @Around(value = "buildMessageLockPoint(buildMessage)", argNames = "pjp,buildMessage")    public Object buildMessageAround(ProceedingJoinPoint pjp, BuildMessage buildMessage) throws Throwable {        final String LOCK = buildMessage.getAppId() + buildMessage.getPushSequence();        Lock lock = redisLockRegistry.obtain(LOCK);        try {            lock.lock();            return pjp.proceed();        } finally {            try {                lock.unlock();            } catch (Exception e) {                log.error("buildMessage={}, Lock {} unlock failed. {}", buildMessage, lock, e);            }        }    }}

四、遇到的问题

  

 

  

  开始是将锁加到deploy的方法上的,但是一直aop一直没有作用,换到consumeStargateDeployMessage方法上就可以了。考虑了一下是因为 @Transactional的原因。这里注意下。

   在一篇文章中找到了原因:

  只要脱离了Spring容器管理的所有对象,对于SpringAOP的注解都会失效,因为他们不是Spring容器的代理类,SpringAOP,就切入不了。也就是说是 @Transactional注解方法的代理对象并不是spring代理对象。

  参考: 

五、使用spring-redis中的RedisLockRegistry

import java.util.concurrent.locks.Lock;import org.springframework.integration.redis.util.RedisLockRegistry;@Beanpublic RedisLockRegistry redisLockRegistry(@Value("${xxx.xxxx.registry}") String redisRegistryKey,                                           RedisTemplate redisTemplate) {    return new RedisLockRegistry(redisTemplate.getConnectionFactory(), redisRegistryKey, 200000);}Lock lock = redisLockRegistry.obtain(appId);lock.tryLock(180, TimeUnit.SECONDS);....lock.unlock();  

六、参考

  其他工具类,请参考。

七、springboot LockRegistry

  

      

 

import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;import java.util.concurrent.TimeoutException;import java.util.concurrent.locks.Lock;import org.junit.Before;import org.junit.Ignore;import org.junit.Test;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;import org.springframework.integration.redis.util.RedisLockRegistry;import redis.clients.jedis.JedisShardInfo;@Ignorepublic class RedisLockTest {  private static final Logger LOGGER = LoggerFactory.getLogger(RedisLockTest.class);  private static final String LOCK = "xxx.xxx";  private RedisLockRegistry redisLockRegistry;  @Before  public void setUp() {    JedisShardInfo shardInfo = new JedisShardInfo("127.0.0.1");    JedisConnectionFactory factory = new JedisConnectionFactory(shardInfo);    redisLockRegistry = new RedisLockRegistry(factory, "test", 50L);  }  private class TaskA implements Runnable {    @Override    public void run() {      try {        Thread.sleep(1000);      } catch (InterruptedException e) {        e.printStackTrace();      }      Lock lock = redisLockRegistry.obtain(LOCK);      try {        lock.lock();        LOGGER.info("Lock {} is obtained", lock);        Thread.sleep(10);        lock.unlock();        LOGGER.info("Lock {} is unlocked", lock);      } catch (Exception ex) {        LOGGER.error("Lock {} unlock failed", lock, ex);      }    }  }  private class TimeoutTask implements Runnable {    @Override    public void run() {      Lock lock = redisLockRegistry.obtain(LOCK);      try {        lock.lock();        LOGGER.info("Lock {} is obtained", lock);        Thread.sleep(5000);        lock.unlock();        LOGGER.info("Lock {} is unlocked", lock);      } catch (Exception ex) {        LOGGER.error("Lock {} unlock failed", lock, ex);      }    }  }  @Test  public void test() throws InterruptedException, TimeoutException {    ExecutorService service = Executors.newFixedThreadPool(2);    service.execute(new TimeoutTask());    service.execute(new TaskA());    service.shutdown();    if (!service.awaitTermination(1, TimeUnit.MINUTES)) {      throw new TimeoutException();    }  }}

 

 

 

转载地址:http://skrsl.baihongyu.com/

你可能感兴趣的文章
《无人机DIY》——3.2 大疆Phantom 2 Vision+
查看>>
《Flink官方文档》Python 编程指南测试版(二)
查看>>
Linux有问必答:如何在VMware ESXi虚拟机上设置静态MAC地址
查看>>
《Unity 游戏案例开发大全》一6.1 背景以及功能概述
查看>>
《C++代码设计与重用》——2.6 接口一致性
查看>>
《AngularJS高级程序设计》——2.4 小结
查看>>
Spark Streaming + Spark SQL 实现配置化ETL流程
查看>>
算法之冒泡排序
查看>>
袋鼠云成企业服务潜力新星 云市场生态持续发酵
查看>>
synchronized 修饰在 static方法和非static方法的区别
查看>>
如何把握住视频直播的风口,打造安全可靠的视频直播平台
查看>>
jquery.form.js失效问题。
查看>>
jetty 基础
查看>>
码栈开发手册(三)---编码方式开发(高级课程②)
查看>>
Hbase 学习(十) HBase Snapshots
查看>>
记一次8小时惊心动魄的服务器+网站升级
查看>>
too many open files
查看>>
Eclispe清除项目缓存无需删除.metadata文件
查看>>
Fedora14升级到Fedora15问题汇总(原创)
查看>>
Oracle数据库语句大全
查看>>