From 2446fa626b45733681bbc446dd52b098345b50ec Mon Sep 17 00:00:00 2001 From: Mario Luo Date: Sun, 31 May 2020 22:51:10 +0800 Subject: [PATCH] =?UTF-8?q?:art:=20#1592=20=E5=AE=9E=E7=8E=B0=E7=AE=80?= =?UTF-8?q?=E5=8D=95=E7=9A=84redis=E5=88=86=E5=B8=83=E5=BC=8F=E9=94=81=20R?= =?UTF-8?q?edisTemplateSimpleDistributedLock?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/redis/RedisTemplateWxRedisOps.java | 7 +- .../RedisTemplateSimpleDistributedLock.java | 126 ++++++++++++++++++ ...edisTemplateSimpleDistributedLockTest.java | 79 +++++++++++ .../result/WxPaySendRedpackResultTest.java | 8 +- 4 files changed, 213 insertions(+), 7 deletions(-) create mode 100644 weixin-java-common/src/main/java/me/chanjar/weixin/common/util/locks/RedisTemplateSimpleDistributedLock.java create mode 100644 weixin-java-common/src/test/java/me/chanjar/weixin/common/util/locks/RedisTemplateSimpleDistributedLockTest.java diff --git a/weixin-java-common/src/main/java/me/chanjar/weixin/common/redis/RedisTemplateWxRedisOps.java b/weixin-java-common/src/main/java/me/chanjar/weixin/common/redis/RedisTemplateWxRedisOps.java index 652cec84a1..19d4046c92 100644 --- a/weixin-java-common/src/main/java/me/chanjar/weixin/common/redis/RedisTemplateWxRedisOps.java +++ b/weixin-java-common/src/main/java/me/chanjar/weixin/common/redis/RedisTemplateWxRedisOps.java @@ -1,11 +1,12 @@ package me.chanjar.weixin.common.redis; +import lombok.NonNull; import lombok.RequiredArgsConstructor; +import me.chanjar.weixin.common.util.locks.RedisTemplateSimpleDistributedLock; import org.springframework.data.redis.core.StringRedisTemplate; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; @RequiredArgsConstructor public class RedisTemplateWxRedisOps implements WxRedisOps { @@ -37,7 +38,7 @@ public void expire(String key, int expire, TimeUnit timeUnit) { } @Override - public Lock getLock(String key) { - return new ReentrantLock(); + public Lock getLock(@NonNull String key) { + return new RedisTemplateSimpleDistributedLock(redisTemplate, key, 60 * 1000); } } diff --git a/weixin-java-common/src/main/java/me/chanjar/weixin/common/util/locks/RedisTemplateSimpleDistributedLock.java b/weixin-java-common/src/main/java/me/chanjar/weixin/common/util/locks/RedisTemplateSimpleDistributedLock.java new file mode 100644 index 0000000000..dfac1c28fb --- /dev/null +++ b/weixin-java-common/src/main/java/me/chanjar/weixin/common/util/locks/RedisTemplateSimpleDistributedLock.java @@ -0,0 +1,126 @@ +package me.chanjar.weixin.common.util.locks; + +import lombok.Getter; +import lombok.NonNull; +import org.jetbrains.annotations.NotNull; +import org.springframework.dao.DataAccessException; +import org.springframework.data.redis.connection.RedisConnection; +import org.springframework.data.redis.connection.RedisStringCommands; +import org.springframework.data.redis.core.RedisCallback; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.data.redis.core.script.DefaultRedisScript; +import org.springframework.data.redis.core.script.RedisScript; +import org.springframework.data.redis.core.types.Expiration; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; + +/** + * 实现简单的redis分布式锁, 支持重入, 不是红锁 + * + * @see reids distlock + */ +public class RedisTemplateSimpleDistributedLock implements Lock { + + @Getter + private final StringRedisTemplate redisTemplate; + @Getter + private final String key; + @Getter + private final int leaseMilliseconds; + + private final ThreadLocal valueThreadLocal = new ThreadLocal<>(); + + public RedisTemplateSimpleDistributedLock(@NonNull StringRedisTemplate redisTemplate, int leaseMilliseconds) { + this(redisTemplate, "lock:" + UUID.randomUUID().toString(), leaseMilliseconds); + } + + public RedisTemplateSimpleDistributedLock(@NonNull StringRedisTemplate redisTemplate, @NonNull String key, int leaseMilliseconds) { + if (leaseMilliseconds <= 0) { + throw new IllegalArgumentException("Parameter 'leaseMilliseconds' must grate then 0: " + leaseMilliseconds); + } + this.redisTemplate = redisTemplate; + this.key = key; + this.leaseMilliseconds = leaseMilliseconds; + } + + @Override + public void lock() { + while (!tryLock()) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // Ignore + } + } + } + + @Override + public void lockInterruptibly() throws InterruptedException { + while (!tryLock()) { + Thread.sleep(1000); + } + } + + @Override + public boolean tryLock() { + String value = valueThreadLocal.get(); + if (value == null || value.length() == 0) { + value = UUID.randomUUID().toString(); + valueThreadLocal.set(value); + } + final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8); + final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8); + List redisResults = redisTemplate.executePipelined(new RedisCallback() { + @Override + public String doInRedis(RedisConnection connection) throws DataAccessException { + connection.set(keyBytes, valueBytes, Expiration.milliseconds(leaseMilliseconds), RedisStringCommands.SetOption.SET_IF_ABSENT); + connection.get(keyBytes); + return null; + } + }); + Object currentLockSecret = redisResults.size() > 1 ? redisResults.get(1) : redisResults.get(0); + return currentLockSecret != null && currentLockSecret.toString().equals(value); + } + + @Override + public boolean tryLock(long time, @NotNull TimeUnit unit) throws InterruptedException { + long waitMs = unit.toMillis(time); + boolean locked = tryLock(); + while (!locked && waitMs > 0) { + long sleep = waitMs < 1000 ? waitMs : 1000; + Thread.sleep(sleep); + waitMs -= sleep; + locked = tryLock(); + } + return locked; + } + + @Override + public void unlock() { + if (valueThreadLocal.get() != null) { + // 提示: 必须指定returnType, 类型: 此处必须为Long, 不能是Integer + RedisScript script = new DefaultRedisScript("if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end", Long.class); + redisTemplate.execute(script, Arrays.asList(key), valueThreadLocal.get()); + valueThreadLocal.remove(); + } + } + + @Override + public Condition newCondition() { + throw new UnsupportedOperationException(); + } + + /** + * 获取当前锁的值 + * return 返回null意味着没有加锁, 但是返回非null值并不以为着当前加锁成功(redis中key可能自动过期) + */ + public String getLockSecretValue() { + return valueThreadLocal.get(); + } +} diff --git a/weixin-java-common/src/test/java/me/chanjar/weixin/common/util/locks/RedisTemplateSimpleDistributedLockTest.java b/weixin-java-common/src/test/java/me/chanjar/weixin/common/util/locks/RedisTemplateSimpleDistributedLockTest.java new file mode 100644 index 0000000000..50a17ed94b --- /dev/null +++ b/weixin-java-common/src/test/java/me/chanjar/weixin/common/util/locks/RedisTemplateSimpleDistributedLockTest.java @@ -0,0 +1,79 @@ +package me.chanjar.weixin.common.util.locks; + +import lombok.SneakyThrows; +import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.testng.Assert.*; + +@Test(enabled = false) +public class RedisTemplateSimpleDistributedLockTest { + + RedisTemplateSimpleDistributedLock redisLock; + + StringRedisTemplate redisTemplate; + + AtomicInteger lockCurrentExecuteCounter; + + @BeforeTest + public void init() { + JedisConnectionFactory connectionFactory = new JedisConnectionFactory(); + connectionFactory.setHostName("127.0.0.1"); + connectionFactory.setPort(6379); + connectionFactory.afterPropertiesSet(); + StringRedisTemplate redisTemplate = new StringRedisTemplate(connectionFactory); + this.redisTemplate = redisTemplate; + this.redisLock = new RedisTemplateSimpleDistributedLock(redisTemplate, 60000); + this.lockCurrentExecuteCounter = new AtomicInteger(0); + } + + @Test(description = "多线程测试锁排他性") + public void testLockExclusive() throws InterruptedException { + int threadSize = 100; + final CountDownLatch startLatch = new CountDownLatch(threadSize); + final CountDownLatch endLatch = new CountDownLatch(threadSize); + + for (int i = 0; i < threadSize; i++) { + new Thread(new Runnable() { + @SneakyThrows + @Override + public void run() { + startLatch.await(); + + redisLock.lock(); + assertEquals(lockCurrentExecuteCounter.incrementAndGet(), 1, "临界区同时只能有一个线程执行"); + lockCurrentExecuteCounter.decrementAndGet(); + redisLock.unlock(); + + endLatch.countDown(); + } + }).start(); + startLatch.countDown(); + } + endLatch.await(); + } + + @Test + public void testTryLock() throws InterruptedException { + assertTrue(redisLock.tryLock(3, TimeUnit.SECONDS), "第一次加锁应该成功"); + assertNotNull(redisLock.getLockSecretValue()); + String redisValue = this.redisTemplate.opsForValue().get(redisLock.getKey()); + assertEquals(redisValue, redisLock.getLockSecretValue()); + + redisLock.unlock(); + assertNull(redisLock.getLockSecretValue()); + redisValue = this.redisTemplate.opsForValue().get(redisLock.getKey()); + assertNull(redisValue, "释放锁后key会被删除"); + + redisLock.unlock(); + } + + +} + diff --git a/weixin-java-pay/src/test/java/com/github/binarywang/wxpay/bean/result/WxPaySendRedpackResultTest.java b/weixin-java-pay/src/test/java/com/github/binarywang/wxpay/bean/result/WxPaySendRedpackResultTest.java index 206703b850..cb9007828f 100644 --- a/weixin-java-pay/src/test/java/com/github/binarywang/wxpay/bean/result/WxPaySendRedpackResultTest.java +++ b/weixin-java-pay/src/test/java/com/github/binarywang/wxpay/bean/result/WxPaySendRedpackResultTest.java @@ -1,10 +1,10 @@ package com.github.binarywang.wxpay.bean.result; -import org.testng.*; -import org.testng.annotations.*; - import com.thoughtworks.xstream.XStream; import me.chanjar.weixin.common.util.xml.XStreamInitializer; +import org.testng.Assert; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; /** * The type Wx pay send redpack result test. @@ -68,6 +68,6 @@ public void loadFailureResult() { Assert.assertEquals("FAIL", wxMpRedpackResult.getReturnCode()); Assert.assertEquals("FAIL", wxMpRedpackResult.getResultCode()); Assert.assertEquals("onqOjjmM1tad-3ROpncN-yUfa6uI", wxMpRedpackResult.getReOpenid()); - Assert.assertEquals(1, wxMpRedpackResult.getTotalAmount()); + Assert.assertEquals(Integer.valueOf(1), wxMpRedpackResult.getTotalAmount()); } }