go?zero微服務(wù)實(shí)戰(zhàn)性能優(yōu)化極致秒殺
引言
上一篇文章中引入了消息隊(duì)列對(duì)秒殺流量做削峰的處理,我們使用的是Kafka,看起來(lái)似乎工作的不錯(cuò),但其實(shí)還是有很多隱患存在,如果這些隱患不優(yōu)化處理掉,那么秒殺搶購(gòu)活動(dòng)開(kāi)始后可能會(huì)出現(xiàn)消息堆積、消費(fèi)延遲、數(shù)據(jù)不一致、甚至服務(wù)崩潰等問(wèn)題,那么后果可想而知。本篇文章我們就一起來(lái)把這些隱患解決掉。
批量數(shù)據(jù)聚合
在SeckillOrder這個(gè)方法中,每來(lái)一次秒殺搶購(gòu)請(qǐng)求都往往Kafka中發(fā)送一條消息。假如這個(gè)時(shí)候有一千萬(wàn)的用戶同時(shí)來(lái)?yè)屬?gòu),就算我們做了各種限流策略,一瞬間還是可能會(huì)有上百萬(wàn)的消息會(huì)發(fā)到Kafka,會(huì)產(chǎn)生大量的網(wǎng)絡(luò)IO和磁盤IO成本,大家都知道Kafka是基于日志的消息系統(tǒng),寫消息雖然大多情況下都是順序IO,但當(dāng)海量的消息同時(shí)寫入的時(shí)候還是可能會(huì)扛不住。
那怎么解決這個(gè)問(wèn)題呢?答案是做消息的聚合。之前發(fā)送一條消息就會(huì)產(chǎn)生一次網(wǎng)絡(luò)IO和一次磁盤IO,我們做消息聚合后,比如聚合100條消息后再發(fā)送給Kafka,這個(gè)時(shí)候100條消息才會(huì)產(chǎn)生一次網(wǎng)絡(luò)IO和磁盤IO,對(duì)整個(gè)Kafka的吞吐和性能是一個(gè)非常大的提升。其實(shí)這就是一種小包聚合的思想,或者叫Batch或者批量的思想。這種思想也隨處可見(jiàn),比如我們使用Mysql插入批量數(shù)據(jù)的時(shí)候,可以通過(guò)一條SQL語(yǔ)句執(zhí)行而不是循環(huán)的一條一條插入,還有Redis的Pipeline操作等等。
那怎么來(lái)聚合呢,聚合策略是啥呢?聚合策略有兩個(gè)維度分別是聚合消息條數(shù)和聚合時(shí)間,比如聚合消息達(dá)到100條我們就往Kafka發(fā)送一次,這個(gè)條數(shù)是可以配置的,那如果一直也達(dá)不到100條消息怎么辦呢?通過(guò)聚合時(shí)間來(lái)兜底,這個(gè)聚合時(shí)間也是可以配置的,比如配置聚合時(shí)間為1秒鐘,也就是無(wú)論目前聚合了多少條消息只要聚合時(shí)間達(dá)到1秒,那么就往Kafka發(fā)送一次數(shù)據(jù)。聚合條數(shù)和聚合時(shí)間是或的關(guān)系,也就是只要有一個(gè)條件滿足就觸發(fā)。
在這里我們提供一個(gè)批量聚合數(shù)據(jù)的工具Batcher,定義如下
type Batcher struct { opts options Do func(ctx context.Context, val map[string][]interface{}) Sharding func(key string) int chans []chan *msg wait sync.WaitGroup }
Do方法:滿足聚合條件后就會(huì)執(zhí)行Do方法,其中val參數(shù)為聚合后的數(shù)據(jù)
Sharding方法:通過(guò)Key進(jìn)行sharding,相同的key消息寫入到同一個(gè)channel中,被同一個(gè)goroutine處理
在merge方法中有兩個(gè)觸發(fā)執(zhí)行Do方法的條件,一是當(dāng)聚合的數(shù)據(jù)條數(shù)大于等于設(shè)置的條數(shù),二是當(dāng)觸發(fā)設(shè)置的定時(shí)器
代碼實(shí)現(xiàn)比較簡(jiǎn)單,如下為具體實(shí)現(xiàn):
type msg struct { key string val interface{} } type Batcher struct { opts options Do func(ctx context.Context, val map[string][]interface{}) Sharding func(key string) int chans []chan *msg wait sync.WaitGroup } func New(opts ...Option) *Batcher { b := &Batcher{} for _, opt := range opts { opt.apply(&b.opts) } b.opts.check() b.chans = make([]chan *msg, b.opts.worker) for i := 0; i < b.opts.worker; i++ { b.chans[i] = make(chan *msg, b.opts.buffer) } return b } func (b *Batcher) Start() { if b.Do == nil { log.Fatal("Batcher: Do func is nil") } if b.Sharding == nil { log.Fatal("Batcher: Sharding func is nil") } b.wait.Add(len(b.chans)) for i, ch := range b.chans { go b.merge(i, ch) } } func (b *Batcher) Add(key string, val interface{}) error { ch, msg := b.add(key, val) select { case ch <- msg: default: return ErrFull } return nil } func (b *Batcher) add(key string, val interface{}) (chan *msg, *msg) { sharding := b.Sharding(key) % b.opts.worker ch := b.chans[sharding] msg := &msg{key: key, val: val} return ch, msg } func (b *Batcher) merge(idx int, ch <-chan *msg) { defer b.wait.Done() var ( msg *msg countint closed bool lastTicker = true interval= b.opts.interval vals = make(map[string][]interface{}, b.opts.size) ) if idx > 0 { interval = time.Duration(int64(idx) * (int64(b.opts.interval) / int64(b.opts.worker))) } ticker := time.NewTicker(interval) for { select { case msg = <-ch: if msg == nil { closed = true break } count++ vals[msg.key] = append(vals[msg.key], msg.val) if count >= b.opts.size { break } continue case <-ticker.C: if lastTicker { ticker.Stop() ticker = time.NewTicker(b.opts.interval) lastTicker = false } } if len(vals) > 0 { ctx := context.Background() b.Do(ctx, vals) vals = make(map[string][]interface{}, b.opts.size) count = 0 } if closed { ticker.Stop() return } } } func (b *Batcher) Close() { for _, ch := range b.chans { ch <- nil } b.wait.Wait() }
使用的時(shí)候需要先創(chuàng)建一個(gè)Batcher,然后定義Batcher的Sharding方法和Do方法,在Sharding方法中通過(guò)ProductID把不同商品的聚合投遞到不同的goroutine中處理,在Do方法中我們把聚合的數(shù)據(jù)一次性批量的發(fā)送到Kafka,定義如下:
b := batcher.New( batcher.WithSize(batcherSize), batcher.WithBuffer(batcherBuffer), batcher.WithWorker(batcherWorker), batcher.WithInterval(batcherInterval), ) b.Sharding = func(key string) int { pid, _ := strconv.ParseInt(key, 10, 64) return int(pid) % batcherWorker } b.Do = func(ctx context.Context, val map[string][]interface{}) { var msgs []*KafkaData for _, vs := range val { for _, v := range vs { msgs = append(msgs, v.(*KafkaData)) } } kd, err := json.Marshal(msgs) if err != nil { logx.Errorf("Batcher.Do json.Marshal msgs: %v error: %v", msgs, err) } if err = s.svcCtx.KafkaPusher.Push(string(kd)); err != nil { logx.Errorf("KafkaPusher.Push kd: %s error: %v", string(kd), err) } } s.batcher = b s.batcher.Start()
在SeckillOrder方法中不再是每來(lái)一次請(qǐng)求就往Kafka中投遞一次消息,而是先通過(guò)batcher提供的Add方法添加到Batcher中等待滿足聚合條件后再往Kafka中投遞。
err = l.batcher.Add(strconv.FormatInt(in.ProductId, 10), &KafkaData{Uid: in.UserId, Pid: in.ProductId}) if err!= nil { logx.Errorf("l.batcher.Add uid: %d pid: %d error: %v", in.UserId, in.ProductId, err) }
降低消息的消費(fèi)延遲
通過(guò)批量消息處理的思想,我們提供了Batcher工具,提升了性能,但這主要是針對(duì)生產(chǎn)端而言的。當(dāng)我們消費(fèi)到批量的數(shù)據(jù)后,還是需要串行的一條條的處理數(shù)據(jù),那有沒(méi)有辦法能加速消費(fèi)從而降低消費(fèi)消息的延遲呢?有兩種方案分別是:
- 增加消費(fèi)者的數(shù)量
- 在一個(gè)消費(fèi)者中增加消息處理的并行度
因?yàn)樵贙afka中,一個(gè)Topci可以配置多個(gè)Partition,數(shù)據(jù)會(huì)被平均或者按照生產(chǎn)者指定的方式寫入到多個(gè)分區(qū)中,那么在消費(fèi)的時(shí)候,Kafka約定一個(gè)分區(qū)只能被一個(gè)消費(fèi)者消費(fèi),為什么要這么設(shè)計(jì)呢?我理解的是如果有多個(gè)Consumer同時(shí)消費(fèi)一個(gè)分區(qū)的數(shù)據(jù),那么在操作這個(gè)消費(fèi)進(jìn)度的時(shí)候就需要加鎖,對(duì)性能影響比較大。所以說(shuō)當(dāng)消費(fèi)者數(shù)量小于分區(qū)數(shù)量的時(shí)候,我們可以增加消費(fèi)者的數(shù)量來(lái)增加消息處理能力,但當(dāng)消費(fèi)者數(shù)量大于分區(qū)的時(shí)候再繼續(xù)增加消費(fèi)者數(shù)量就沒(méi)有意義了。
不能增加Consumer的時(shí)候,可以在同一個(gè)Consumer中提升處理消息的并行度,即通過(guò)多個(gè)goroutine來(lái)并行的消費(fèi)數(shù)據(jù),我們一起來(lái)看看如何通過(guò)多個(gè)goroutine來(lái)消費(fèi)消息。
在Service中定義msgsChan,msgsChan為Slice,Slice的長(zhǎng)度表示有多少個(gè)goroutine并行的處理數(shù)據(jù),初始化如下:
func NewService(c config.Config) *Service { s := &Service{ c: c, ProductRPC: product.NewProduct(zrpc.MustNewClient(c.ProductRPC)), OrderRPC:order.NewOrder(zrpc.MustNewClient(c.OrderRPC)), msgsChan:make([]chan *KafkaData, chanCount), } for i := 0; i < chanCount; i++ { ch := make(chan *KafkaData, bufferCount) s.msgsChan[i] = ch s.waiter.Add(1) go s.consume(ch) } return s }
從Kafka中消費(fèi)到數(shù)據(jù)后,把數(shù)據(jù)投遞到Channel中,注意投遞消息的時(shí)候按照商品的id做Sharding,這能保證在同一個(gè)Consumer中對(duì)同一個(gè)商品的處理是串行的,串行的數(shù)據(jù)處理不會(huì)導(dǎo)致并發(fā)帶來(lái)的數(shù)據(jù)競(jìng)爭(zhēng)問(wèn)題
func (s *Service) Consume(_ string, value string) error { logx.Infof("Consume value: %s\n", value) var data []*KafkaData if err := json.Unmarshal([]byte(value), &data); err != nil { return err } for _, d := range data { s.msgsChan[d.Pid%chanCount] <- d } return nil }
我們定義了chanCount個(gè)goroutine同時(shí)處理數(shù)據(jù),每個(gè)channel的長(zhǎng)度定義為bufferCount,并行處理數(shù)據(jù)的方法為consume,如下:
func (s *Service) consume(ch chan *KafkaData) { defer s.waiter.Done() for { m, ok := <-ch if !ok { log.Fatal("seckill rmq exit") } fmt.Printf("consume msg: %+v\n", m) p, err := s.ProductRPC.Product(context.Background(), &product.ProductItemRequest{ProductId: m.Pid}) if err != nil { logx.Errorf("s.ProductRPC.Product pid: %d error: %v", m.Pid, err) return } if p.Stock <= 0 { logx.Errorf("stock is zero pid: %d", m.Pid) return } _, err = s.OrderRPC.CreateOrder(context.Background(), &order.CreateOrderRequest{Uid: m.Uid, Pid: m.Pid}) if err != nil { logx.Errorf("CreateOrder uid: %d pid: %d error: %v", m.Uid, m.Pid, err) return } _, err = s.ProductRPC.UpdateProductStock(context.Background(), &product.UpdateProductStockRequest{ProductId: m.Pid, Num: 1}) if err != nil { logx.Errorf("UpdateProductStock uid: %d pid: %d error: %v", m.Uid, m.Pid, err) } } }
怎么保證不會(huì)超賣
當(dāng)秒殺活動(dòng)開(kāi)始后,大量用戶點(diǎn)擊商品詳情頁(yè)上的秒殺按鈕,會(huì)產(chǎn)生大量的并發(fā)請(qǐng)求查詢庫(kù)存,一旦某個(gè)請(qǐng)求查詢到有庫(kù)存,緊接著系統(tǒng)就會(huì)進(jìn)行庫(kù)存的扣減。然后,系統(tǒng)生成實(shí)際的訂單,并進(jìn)行后續(xù)的處理。如果請(qǐng)求查不到庫(kù)存,就會(huì)返回,用戶通常會(huì)繼續(xù)點(diǎn)擊秒殺按鈕,繼續(xù)查詢庫(kù)存。簡(jiǎn)單來(lái)說(shuō),這個(gè)階段的操作就是三個(gè):檢查庫(kù)存,庫(kù)存扣減、和訂單處理。因?yàn)槊總€(gè)秒殺請(qǐng)求都會(huì)查詢庫(kù)存,而請(qǐng)求只有查到庫(kù)存有余量后,后續(xù)的庫(kù)存扣減和訂單處理才會(huì)被執(zhí)行,所以,這個(gè)階段中最大的并發(fā)壓力都在庫(kù)存檢查操作上。
為了支撐大量高并發(fā)的庫(kù)存檢查請(qǐng)求,我們需要使用Redis單獨(dú)保存庫(kù)存量。那么,庫(kù)存扣減和訂單處理是否都可以交給Mysql來(lái)處理呢?其實(shí),訂單的處理是可以在數(shù)據(jù)庫(kù)中執(zhí)行的,但庫(kù)存扣減操作不能交給Mysql直接處理。因?yàn)榈搅藢?shí)際的訂單處理環(huán)節(jié),請(qǐng)求的壓力已經(jīng)不大了,數(shù)據(jù)庫(kù)完全可以支撐這些訂單處理請(qǐng)求。那為什么庫(kù)存扣減不能直接在數(shù)據(jù)庫(kù)中執(zhí)行呢?這是因?yàn)椋坏┱?qǐng)求查到有庫(kù)存,就意味著該請(qǐng)求獲得購(gòu)買資格,緊接著就會(huì)進(jìn)行下單操作,同時(shí)庫(kù)存量會(huì)減一,這個(gè)時(shí)候如果直接操作數(shù)據(jù)庫(kù)來(lái)扣減庫(kù)存可能就會(huì)導(dǎo)致超賣問(wèn)題。
直接操作數(shù)據(jù)庫(kù)扣減庫(kù)存為什么會(huì)導(dǎo)致超賣呢?由于數(shù)據(jù)庫(kù)的處理速度較慢,不能及時(shí)更新庫(kù)存余量,這就會(huì)導(dǎo)致大量的查詢庫(kù)存的請(qǐng)求讀取到舊的庫(kù)存值,并進(jìn)行下單,此時(shí)就會(huì)出現(xiàn)下單數(shù)量大于實(shí)際的庫(kù)存量,導(dǎo)致超賣。所以,就需要直接在Redis中進(jìn)行庫(kù)存扣減,具體的操作是,當(dāng)庫(kù)存檢查完后,一旦庫(kù)存有余量,我們就立即在Redis中扣減庫(kù)存,同時(shí),為了避免請(qǐng)求查詢到舊的庫(kù)存值,庫(kù)存檢查和庫(kù)存扣減這兩個(gè)操作需要保證原子性。
我們使用Redis的Hash來(lái)存儲(chǔ)庫(kù)存,total為總庫(kù)存,seckill為已秒殺的數(shù)量,為了保證查詢庫(kù)存和減庫(kù)存的原子性,我們使用Lua腳本進(jìn)行原子操作,讓秒殺量小于庫(kù)存的時(shí)候返回1,表示秒殺成功,否則返回0,表示秒殺失敗,代碼如下:
const ( luaCheckAndUpdateScript = ` local counts = redis.call("HMGET", KEYS[1], "total", "seckill") local total = tonumber(counts[1]) local seckill = tonumber(counts[2]) if seckill + 1 <= total then redis.call("HINCRBY", KEYS[1], "seckill", 1) return 1 end return 0 ` ) func (l *CheckAndUpdateStockLogic) CheckAndUpdateStock(in *product.CheckAndUpdateStockRequest) (*product.CheckAndUpdateStockResponse, error) { val, err := l.svcCtx.BizRedis.EvalCtx(l.ctx, luaCheckAndUpdateScript, []string{stockKey(in.ProductId)}) if err != nil { return nil, err } if val.(int64) == 0 { return nil, status.Errorf(codes.ResourceExhausted, fmt.Sprintf("insufficient stock: %d", in.ProductId)) } return &product.CheckAndUpdateStockResponse{}, nil } func stockKey(pid int64) string { return fmt.Sprintf("stock:%d", pid) }
對(duì)應(yīng)的seckill-rmq代碼修改如下:
func (s *Service) consume(ch chan *KafkaData) { defer s.waiter.Done() for { m, ok := <-ch if !ok { log.Fatal("seckill rmq exit") } fmt.Printf("consume msg: %+v\n", m) _, err := s.ProductRPC.CheckAndUpdateStock(context.Background(), &product.CheckAndUpdateStockRequest{ProductId: m.Pid}) if err != nil { logx.Errorf("s.ProductRPC.CheckAndUpdateStock pid: %d error: %v", m.Pid, err) return } _, err = s.OrderRPC.CreateOrder(context.Background(), &order.CreateOrderRequest{Uid: m.Uid, Pid: m.Pid}) if err != nil { logx.Errorf("CreateOrder uid: %d pid: %d error: %v", m.Uid, m.Pid, err) return } _, err = s.ProductRPC.UpdateProductStock(context.Background(), &product.UpdateProductStockRequest{ProductId: m.Pid, Num: 1}) if err != nil { logx.Errorf("UpdateProductStock uid: %d pid: %d error: %v", m.Uid, m.Pid, err) } } }
到這里,我們已經(jīng)了解了如何使用原子性的Lua腳本來(lái)實(shí)現(xiàn)庫(kù)存的檢查和扣減。其實(shí)要想保證庫(kù)存檢查和扣減的原子性,還有另外一種方法,那就是使用分布式鎖。
分布式鎖的實(shí)現(xiàn)方式有很多種,可以基于Redis、Etcd等等,用Redis實(shí)現(xiàn)分布式鎖的文章比較多,感興趣的可以自行搜索參考。這里給大家簡(jiǎn)單介紹下基于Etcd來(lái)實(shí)現(xiàn)分布式鎖。為了簡(jiǎn)化分布式鎖、分布式選舉、分布式事務(wù)的實(shí)現(xiàn),etcd社區(qū)提供了一個(gè)名為concurrency的包來(lái)幫助我們更簡(jiǎn)單、正確的使用分布式鎖。它的實(shí)現(xiàn)非常簡(jiǎn)單,主要流程如下:
- 首先通過(guò)concurrency.NewSession方法創(chuàng)建Session,本質(zhì)上是創(chuàng)建了一個(gè)TTL為10的Lease
- 得到Session對(duì)象后,通過(guò)concurrency.NewMutex創(chuàng)建一個(gè)mutex對(duì)象,包括了Lease、key prefix等信息
- 然后聽(tīng)過(guò)mutex對(duì)象的Lock方法嘗試獲取鎖
- 最后通過(guò)mutex對(duì)象的Unlock方法釋放鎖
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints}) if err != nil { log.Fatal(err) } defer cli.Close() session, err := concurrency.NewSession(cli, concurrency.WithTTL(10)) if err != nil { log.Fatal(err) } defer session.Close() mux := concurrency.NewMutex(session, "lock") if err := mux.Lock(context.Background()); err != nil { log.Fatal(err) } if err := mux.Unlock(context.Background()); err != nil { log.Fatal(err) }
結(jié)束語(yǔ)
本篇文章主要是針對(duì)秒殺功能繼續(xù)做了一些優(yōu)化。在Kafka消息的生產(chǎn)端做了批量消息聚合發(fā)送的優(yōu)化,Batch思想在實(shí)際生產(chǎn)開(kāi)發(fā)中使用非常多,希望大家能夠活靈活用,在消息的消費(fèi)端通過(guò)增加并行度來(lái)提升吞吐能力,這也是提升性能常用的優(yōu)化手段。最后介紹了可能導(dǎo)致超賣的原因,以及給出了相對(duì)應(yīng)的解決方案。同時(shí),介紹了基于Etcd的分布式鎖,在分布式服務(wù)中經(jīng)常出現(xiàn)數(shù)據(jù)競(jìng)爭(zhēng)的問(wèn)題,一般可以通過(guò)分布式鎖來(lái)解決,但分布式鎖的引入勢(shì)必會(huì)導(dǎo)致性能的下降,所以,還需要結(jié)合實(shí)際情況考慮是否需要引入分布式鎖。
代碼倉(cāng)庫(kù):https://github.com/zhoushuguang/lebron
項(xiàng)目地址https://github.com/zeromicro/go-zero
以上就是go zero微服務(wù)實(shí)戰(zhàn)性能優(yōu)化極致秒殺的詳細(xì)內(nèi)容,更多關(guān)于go zero微服務(wù)秒殺優(yōu)化的資料請(qǐng)關(guān)注本站其它相關(guān)文章!
版權(quán)聲明:本站文章來(lái)源標(biāo)注為YINGSOO的內(nèi)容版權(quán)均為本站所有,歡迎引用、轉(zhuǎn)載,請(qǐng)保持原文完整并注明來(lái)源及原文鏈接。禁止復(fù)制或仿造本網(wǎng)站,禁止在非www.sddonglingsh.com所屬的服務(wù)器上建立鏡像,否則將依法追究法律責(zé)任。本站部分內(nèi)容來(lái)源于網(wǎng)友推薦、互聯(lián)網(wǎng)收集整理而來(lái),僅供學(xué)習(xí)參考,不代表本站立場(chǎng),如有內(nèi)容涉嫌侵權(quán),請(qǐng)聯(lián)系alex-e#qq.com處理。