Redis實(shí)現(xiàn)庫(kù)存扣減的解決方案防止商品超賣(mài)
Redis 如何實(shí)現(xiàn)庫(kù)存扣減操作?如何防止商品被超賣(mài)?
基于數(shù)據(jù)庫(kù)單庫(kù)存 基于數(shù)據(jù)庫(kù)多庫(kù)存 基于redis 基于redis實(shí)現(xiàn)扣減庫(kù)存的具體實(shí)現(xiàn) 初始化庫(kù)存回調(diào)函數(shù)(IStockCallback) 扣減庫(kù)存服務(wù)(StockService)。
在日常開(kāi)發(fā)中有很多地方都有類(lèi)似扣減庫(kù)存的操作,比如電商系統(tǒng)中的商品庫(kù)存,抽獎(jiǎng)系統(tǒng)中的獎(jiǎng)品庫(kù)存等。
解決方案
1. 使用mysql數(shù)據(jù)庫(kù)
使用一個(gè)字段來(lái)存儲(chǔ)庫(kù)存,每次扣減庫(kù)存去更新這個(gè)字段。
2. 還是使用數(shù)據(jù)庫(kù)
但是將庫(kù)存分層多份存到多條記錄里面,扣減庫(kù)存的時(shí)候路由一下,這樣子增大了并發(fā)量,但是還是避免不了大量的去訪(fǎng)問(wèn)數(shù)據(jù)庫(kù)來(lái)更新庫(kù)存。
3. 將庫(kù)存放到redis使用redis的incrby特性來(lái)扣減庫(kù)存。
分析
在上面的第一種和第二種方式都是基于數(shù)據(jù)來(lái)扣減庫(kù)存。
[基于數(shù)據(jù)庫(kù)單庫(kù)存]
第一種方式在所有請(qǐng)求都會(huì)在這里等待鎖,獲取鎖有去扣減庫(kù)存。在并發(fā)量不高的情況下可以使用,但是一旦并發(fā)量大了就會(huì)有大量請(qǐng)求阻塞在這里,導(dǎo)致請(qǐng)求超時(shí),進(jìn)而整個(gè)系統(tǒng)雪崩;而且會(huì)頻繁的去訪(fǎng)問(wèn)數(shù)據(jù)庫(kù),大量占用數(shù)據(jù)庫(kù)資源,所以在并發(fā)高的情況下這種方式不適用。
[基于數(shù)據(jù)庫(kù)多庫(kù)存]
第二種方式其實(shí)是第一種方式的優(yōu)化版本,在一定程度上提高了并發(fā)量,但是在還是會(huì)大量的對(duì)數(shù)據(jù)庫(kù)做更新操作大量占用數(shù)據(jù)庫(kù)資源。
基于數(shù)據(jù)庫(kù)來(lái)實(shí)現(xiàn)扣減庫(kù)存還存在的一些問(wèn)題:
用數(shù)據(jù)庫(kù)扣減庫(kù)存的方式,扣減庫(kù)存的操作必須在一條語(yǔ)句中執(zhí)行,不能先selec在update,這樣在并發(fā)下會(huì)出現(xiàn)超扣的情況。如:
update number set x=x-1 where x > 0
- MySQL自身對(duì)于高并發(fā)的處理性能就會(huì)出現(xiàn)問(wèn)題,一般來(lái)說(shuō),MySQL的處理性能會(huì)隨著并發(fā)thread上升而上升,但是到了一定的并發(fā)度之后會(huì)出現(xiàn)明顯的拐點(diǎn),之后一路下降,最終甚至?xí)葐蝨hread的性能還要差。
- 當(dāng)減庫(kù)存和高并發(fā)碰到一起的時(shí)候,由于操作的庫(kù)存數(shù)目在同一行,就會(huì)出現(xiàn)爭(zhēng)搶InnoDB行鎖的問(wèn)題,導(dǎo)致出現(xiàn)互相等待甚至死鎖,從而大大降低MySQL的處理性能,最終導(dǎo)致前端頁(yè)面出現(xiàn)超時(shí)異常。
[基于redis]
針對(duì)上述問(wèn)題的問(wèn)題我們就有了第三種方案,將庫(kù)存放到緩存,利用redis的incrby特性來(lái)扣減庫(kù)存,解決了超扣和性能問(wèn)題。但是一旦緩存丟失需要考慮恢復(fù)方案。比如抽獎(jiǎng)系統(tǒng)扣獎(jiǎng)品庫(kù)存的時(shí)候,初始庫(kù)存=總的庫(kù)存數(shù)-已經(jīng)發(fā)放的獎(jiǎng)勵(lì)數(shù),但是如果是異步發(fā)獎(jiǎng),需要等到MQ消息消費(fèi)完了才能重啟redis初始化庫(kù)存,否則也存在庫(kù)存不一致的問(wèn)題。
基于redis實(shí)現(xiàn)扣減庫(kù)存的具體實(shí)現(xiàn)
- 我們使用redis的lua腳本來(lái)實(shí)現(xiàn)扣減庫(kù)存
- 由于是分布式環(huán)境下所以還需要一個(gè)分布式鎖來(lái)控制只能有一個(gè)服務(wù)去初始化庫(kù)存
- 需要提供一個(gè)回調(diào)函數(shù),在初始化庫(kù)存的時(shí)候去調(diào)用這個(gè)函數(shù)獲取初始化庫(kù)存
[初始化庫(kù)存回調(diào)函數(shù)(IStockCallback )]
/** * 獲取庫(kù)存回調(diào) * @author yuhao.wang */ public interface IStockCallback { /** * 獲取庫(kù)存 * @return */ int getStock(); }
[扣減庫(kù)存服務(wù)(StockService)]
/** * 扣庫(kù)存 * * @author yuhao.wang */ @Service public class StockService { Logger logger = LoggerFactory.getLogger(StockService.class); /** * 不限庫(kù)存 */ public static final long UNINITIALIZED_STOCK = -3L; /** * Redis 客戶(hù)端 */ @Autowired private RedisTemplate<String, Object> redisTemplate; /** * 執(zhí)行扣庫(kù)存的腳本 */ public static final String STOCK_LUA; static { /** * * @desc 扣減庫(kù)存Lua腳本 * 庫(kù)存(stock)-1:表示不限庫(kù)存 * 庫(kù)存(stock)0:表示沒(méi)有庫(kù)存 * 庫(kù)存(stock)大于0:表示剩余庫(kù)存 * * @params 庫(kù)存key * @return * -3:庫(kù)存未初始化 * -2:庫(kù)存不足 * -1:不限庫(kù)存 * 大于等于0:剩余庫(kù)存(扣減之后剩余的庫(kù)存) * redis緩存的庫(kù)存(value)是-1表示不限庫(kù)存,直接返回1 */ StringBuilder sb = new StringBuilder(); sb.append("if (redis.call('exists', KEYS[1]) == 1) then"); sb.append(" local stock = tonumber(redis.call('get', KEYS[1]));"); sb.append(" local num = tonumber(ARGV[1]);"); sb.append(" if (stock == -1) then"); sb.append(" return -1;"); sb.append(" end;"); sb.append(" if (stock >= num) then"); sb.append(" return redis.call('incrby', KEYS[1], 0 - num);"); sb.append(" end;"); sb.append(" return -2;"); sb.append("end;"); sb.append("return -3;"); STOCK_LUA = sb.toString(); } /** * @param key 庫(kù)存key * @param expire 庫(kù)存有效時(shí)間,單位秒 * @param num 扣減數(shù)量 * @param stockCallback 初始化庫(kù)存回調(diào)函數(shù) * @return -2:庫(kù)存不足; -1:不限庫(kù)存; 大于等于0:扣減庫(kù)存之后的剩余庫(kù)存 */ public long stock(String key, long expire, int num, IStockCallback stockCallback) { long stock = stock(key, num); // 初始化庫(kù)存 if (stock == UNINITIALIZED_STOCK) { RedisLock redisLock = new RedisLock(redisTemplate, key); try { // 獲取鎖 if (redisLock.tryLock()) { // 雙重驗(yàn)證,避免并發(fā)時(shí)重復(fù)回源到數(shù)據(jù)庫(kù) stock = stock(key, num); if (stock == UNINITIALIZED_STOCK) {// 獲取初始化庫(kù)存final int initStock = stockCallback.getStock();// 將庫(kù)存設(shè)置到redisredisTemplate.opsForValue().set(key, initStock, expire, TimeUnit.SECONDS);// 調(diào)一次扣庫(kù)存的操作stock = stock(key, num); } } } catch (Exception e) { logger.error(e.getMessage(), e); } finally { redisLock.unlock(); } } return stock; } /** * 加庫(kù)存(還原庫(kù)存) * * @param key 庫(kù)存key * @param num 庫(kù)存數(shù)量 * @return */ public long addStock(String key, int num) { return addStock(key, null, num); } /** * 加庫(kù)存 * * @param key 庫(kù)存key * @param expire 過(guò)期時(shí)間(秒) * @param num 庫(kù)存數(shù)量 * @return */ public long addStock(String key, Long expire, int num) { boolean hasKey = redisTemplate.hasKey(key); // 判斷key是否存在,存在就直接更新 if (hasKey) { return redisTemplate.opsForValue().increment(key, num); } Assert.notNull(expire,"初始化庫(kù)存失敗,庫(kù)存過(guò)期時(shí)間不能為null"); RedisLock redisLock = new RedisLock(redisTemplate, key); try { if (redisLock.tryLock()) { // 獲取到鎖后再次判斷一下是否有key hasKey = redisTemplate.hasKey(key); if (!hasKey) { // 初始化庫(kù)存 redisTemplate.opsForValue().set(key, num, expire, TimeUnit.SECONDS); } } } catch (Exception e) { logger.error(e.getMessage(), e); } finally { redisLock.unlock(); } return num; } /** * 獲取庫(kù)存 * * @param key 庫(kù)存key * @return -1:不限庫(kù)存; 大于等于0:剩余庫(kù)存 */ public int getStock(String key) { Integer stock = (Integer) redisTemplate.opsForValue().get(key); return stock == null ? -1 : stock; } /** * 扣庫(kù)存 * * @param key 庫(kù)存key * @param num 扣減庫(kù)存數(shù)量 * @return 扣減之后剩余的庫(kù)存【-3:庫(kù)存未初始化; -2:庫(kù)存不足; -1:不限庫(kù)存; 大于等于0:扣減庫(kù)存之后的剩余庫(kù)存】 */ private Long stock(String key, int num) { // 腳本里的KEYS參數(shù) List<String> keys = new ArrayList<>(); keys.add(key); // 腳本里的ARGV參數(shù) List<String> args = new ArrayList<>(); args.add(Integer.toString(num)); long result = redisTemplate.execute(new RedisCallback<Long>() { @Override public Long doInRedis(RedisConnection connection) throws DataAccessException { Object nativeConnection = connection.getNativeConnection(); // 集群模式和單機(jī)模式雖然執(zhí)行腳本的方法一樣,但是沒(méi)有共同的接口,所以只能分開(kāi)執(zhí)行 // 集群模式 if (nativeConnection instanceof JedisCluster) { return (Long) ((JedisCluster) nativeConnection).eval(STOCK_LUA, keys, args); } // 單機(jī)模式 else if (nativeConnection instanceof Jedis) { return (Long) ((Jedis) nativeConnection).eval(STOCK_LUA, keys, args); } return UNINITIALIZED_STOCK; } }); return result; } }
[調(diào)用]
/** * @author yuhao.wang */ @RestController public class StockController { @Autowired private StockService stockService; @RequestMapping(value = "stock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE) public Object stock() { // 商品ID long commodityId = 1; // 庫(kù)存ID String redisKey = "redis_key:stock:" + commodityId; long stock = stockService.stock(redisKey, 60 * 60, 2, () -> initStock(commodityId)); return stock >= 0; } /** * 獲取初始的庫(kù)存 * * @return */ private int initStock(long commodityId) { // TODO 這里做一些初始化庫(kù)存的操作 return 1000; } @RequestMapping(value = "getStock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE) public Object getStock() { // 商品ID long commodityId = 1; // 庫(kù)存ID String redisKey = "redis_key:stock:" + commodityId; return stockService.getStock(redisKey); } @RequestMapping(value = "addStock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE) public Object addStock() { // 商品ID long commodityId = 2; // 庫(kù)存ID String redisKey = "redis_key:stock:" + commodityId; return stockService.addStock(redisKey, 2); } }
結(jié)語(yǔ)
到此這篇關(guān)于Redis如何實(shí)現(xiàn)庫(kù)存扣減操作?如何防止商品被超賣(mài)?的文章就介紹到這了,更多相關(guān)Redis庫(kù)存扣減內(nèi)容請(qǐng)搜索本站以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持本站!
版權(quán)聲明:本站文章來(lái)源標(biāo)注為YINGSOO的內(nèi)容版權(quán)均為本站所有,歡迎引用、轉(zhuǎn)載,請(qǐng)保持原文完整并注明來(lái)源及原文鏈接。禁止復(fù)制或仿造本網(wǎng)站,禁止在非www.sddonglingsh.com所屬的服務(wù)器上建立鏡像,否則將依法追究法律責(zé)任。本站部分內(nèi)容來(lái)源于網(wǎng)友推薦、互聯(lián)網(wǎng)收集整理而來(lái),僅供學(xué)習(xí)參考,不代表本站立場(chǎng),如有內(nèi)容涉嫌侵權(quán),請(qǐng)聯(lián)系alex-e#qq.com處理。