基于redis的分布式锁

avatar
作者
筋斗云
阅读量:0

 目前业务组新参与了一个项目,涉及到用户积分的增删改查核销等等,预计上线后qps大概在20k左右,我们使用了基于redis的分布式锁来保证数据的一致性。

@Slf4j @Service public class RedisLockService {     /**      * 加锁lua脚本      */     private static final String REDIS_LOCK_LUA_SCRIPT = "if redis.call('setnx',KEYS[1],ARGV[1]) == 1 then  return redis.call('expire',KEYS[1],ARGV[2])  else return 0 end";     /**      * 解锁lua脚本      */     private static final String REDIS_UN_LOCK_LUA_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";      /**      * 数据库操作key      */     private static final String DB_OPERATION_KEY_PREFIX = "operation_key_";     @Resource     private RedisClient redisClient;       /**      * 添加DB数据库锁      *      * @param userId 用户名      * @return 加锁成功之后,返回sequenceId      */     public String dbLock(int userId) {         String sequenceId = String.valueOf(System.nanoTime());         if (lock(DB_OPERATION_KEY_PREFIX + userId, sequenceId, 10, TimeUnit.SECONDS)) {             return sequenceId;         } else {             recordOne("redis_lock_service_db_lock_fail");             return null;         }     }      /**      * 解数据库锁      *      * @param userId     用户名      * @param sequenceId 序列号      * @return 返回是否解锁成功      */     public boolean dbUnlock(int userId, String sequenceId) {          boolean result = unLock(DB_OPERATION_KEY_PREFIX + userId, sequenceId);         if (!result) {             recordOne("redis_lock_service_db_un_lock_fail");         }         return result;     }      /**      * 添加分布式锁      *      * @param key        锁的key      * @param sequenceId 唯一Id      * @param expireTime 过期时间      * @param unit       过期时间单位      * @return 返回是否加锁成功      */     public boolean lock(String key, String sequenceId, int expireTime, TimeUnit unit) {         long start = System.currentTimeMillis();          try {             if (tryLock(key, sequenceId, expireTime, unit, 0)) {                 return true;             } else {                 log.error("添加分布式锁失败:key={}", key);                 recordOne("redis_lock_service_lock_fail");                 return false;             }         } catch (Exception e) {             log.error("添加分布式锁异常:key={}", key, e);             recordOne("redis_lock_service_lock_exception");         } finally {             recordQuantile("redis_lock_service_lock", System.currentTimeMillis() - start);         }         return false;     }      public boolean lock(String key, String sequenceId, int expireTime, TimeUnit unit, int waitTimeInMills) {         long start = System.currentTimeMillis();          try {             if (tryLock(key, sequenceId, expireTime, unit, waitTimeInMills)) {                 return true;             } else {                 log.error("添加分布式锁失败:key={}", key);                 recordOne("redis_lock_service_lock_fail");                 return false;             }         } catch (Exception e) {             log.error("添加分布式锁异常:key={}", key, e);             recordOne("redis_lock_service_lock_exception");         } finally {             recordQuantile("redis_lock_service_lock", System.currentTimeMillis() - start);         }         return false;     }        public String lockWithWaitTime(String key, int expireTime, int waitTimeInMills) {         long start = System.currentTimeMillis();         String sequenceId = String.valueOf(System.nanoTime());         if (tryLock(DB_OPERATION_KEY_PREFIX + key, sequenceId, expireTime, TimeUnit.SECONDS, waitTimeInMills)) {             recordQuantile("redis_lockNew_service_lock", System.currentTimeMillis() - start);             return sequenceId;         } else {             log.error("lockNew 添加分布式锁失败:key={}", key);             recordOne("redis_lockNew_service_lock_fail");             return null;         }     }      /**      * 解分布式锁      *      * @param key        解锁key      * @param sequenceId 唯一的一个Id      * @return 返回解锁是否成功      */     public boolean unLock(String key, String sequenceId) {         long start = System.currentTimeMillis();          try {             long result = redisClient.getSession(key, true).getSingleConnectionExecutor().evalAutoSha(REDIS_UN_LOCK_LUA_SCRIPT, ResultType.INTEGER, new String[]{key}, new String[]{sequenceId}).get(500, TimeUnit.MILLISECONDS);             if (result > 0) {                 return true;             } else {                 log.error("解锁失败:key={}", key);                 recordOne("redis_lock_service_un_lock_fail");                 return false;             }         } catch (Exception e) {             recordOne("redis_lock_service_un_lock_exception");             log.error("解锁异常:key={}", key, e);         } finally {             recordQuantile("redis_lock_service_un_lock", System.currentTimeMillis() - start);         }         return false;     }      /**      * Redis 尝试加锁      *      * @param key              加锁的Key      * @param expireTime       过期时间      * @param timeUnit         时间单位      * @param waitTimeInMills 等待时间      * @return 返回加锁是否成功      */     private boolean tryLock(final String key, String sequenceId, int expireTime, TimeUnit timeUnit, int waitTimeInMills) {         log.info("redis_lock_service_tryLock_add, userId={}, sequenceId={}", key, sequenceId);         long start = System.currentTimeMillis();         long expireTimeSecond = timeUnit.toSeconds(expireTime);          try {             while (redisClient.getSession(key, true).getSingleConnectionExecutor().evalAutoSha(REDIS_LOCK_LUA_SCRIPT, ResultType.INTEGER, new String[]{key}, new String[]{sequenceId, String.valueOf(expireTimeSecond)}).get(500, TimeUnit.MILLISECONDS) <= 0) {                 long time = System.currentTimeMillis() - start;                 if (time > waitTimeInMills) {                     log.error("尝试添加分布式锁失败:key={} time:{}, 超时返回", key, time);                     log.info("redis_lock_service_tryLock_timeout_fail, userId={}, sequenceId={}", key, sequenceId);                     recordOne("redis_lock_service_tryLock_timeout_fail");                     return false;                 }                 log.info("尝试添加分布式锁成功:key={} time:{}, 等待中", key, time);                 TimeUnit.MILLISECONDS.sleep(10);             }             return true;         } catch (Exception e) {             Long unlockResult = null;             try {                 unlockResult = redisClient.getSession(key, true).getSingleConnectionExecutor().evalAutoSha(REDIS_UN_LOCK_LUA_SCRIPT, ResultType.INTEGER, new String[]{key}, new String[]{sequenceId}).get(500, TimeUnit.MILLISECONDS);             } catch (Exception exception) {                 log.error("尝试添加分布式锁失败后解锁异常:key={}", key, e);                 recordOne("redis_lock_service_try_lock_unlock_exception");             }             log.error("尝试添加分布式锁失败:key={}, 解锁结果:{}", key, unlockResult, e);             recordOne("redis_lock_service_try_lock_exception");         } finally {             recordQuantile("redis_lock_service_try_lock", System.currentTimeMillis() - start);         }         log.info("redis_lock_service_tryLock_fail, userId={}, sequenceId={}", key, sequenceId);         return false;     } } 

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!