人妖在线一区,国产日韩欧美一区二区综合在线,国产啪精品视频网站免费,欧美内射深插日本少妇

新聞動(dòng)態(tài)

go?zero微服務(wù)實(shí)戰(zhàn)性能優(yōu)化極致秒殺

發(fā)布日期:2022-07-15 19:30 | 文章來(lái)源:腳本之家

引言

上一篇文章中引入了消息隊(duì)列對(duì)秒殺流量做削峰的處理,我們使用的是Kafka,看起來(lái)似乎工作的不錯(cuò),但其實(shí)還是有很多隱患存在,如果這些隱患不優(yōu)化處理掉,那么秒殺搶購(gòu)活動(dòng)開(kāi)始后可能會(huì)出現(xiàn)消息堆積、消費(fèi)延遲、數(shù)據(jù)不一致、甚至服務(wù)崩潰等問(wèn)題,那么后果可想而知。本篇文章我們就一起來(lái)把這些隱患解決掉。

批量數(shù)據(jù)聚合

SeckillOrder這個(gè)方法中,每來(lái)一次秒殺搶購(gòu)請(qǐng)求都往往Kafka中發(fā)送一條消息。假如這個(gè)時(shí)候有一千萬(wàn)的用戶同時(shí)來(lái)?yè)屬?gòu),就算我們做了各種限流策略,一瞬間還是可能會(huì)有上百萬(wàn)的消息會(huì)發(fā)到Kafka,會(huì)產(chǎn)生大量的網(wǎng)絡(luò)IO和磁盤IO成本,大家都知道Kafka是基于日志的消息系統(tǒng),寫消息雖然大多情況下都是順序IO,但當(dāng)海量的消息同時(shí)寫入的時(shí)候還是可能會(huì)扛不住。

那怎么解決這個(gè)問(wèn)題呢?答案是做消息的聚合。之前發(fā)送一條消息就會(huì)產(chǎn)生一次網(wǎng)絡(luò)IO和一次磁盤IO,我們做消息聚合后,比如聚合100條消息后再發(fā)送給Kafka,這個(gè)時(shí)候100條消息才會(huì)產(chǎn)生一次網(wǎng)絡(luò)IO和磁盤IO,對(duì)整個(gè)Kafka的吞吐和性能是一個(gè)非常大的提升。其實(shí)這就是一種小包聚合的思想,或者叫Batch或者批量的思想。這種思想也隨處可見(jiàn),比如我們使用Mysql插入批量數(shù)據(jù)的時(shí)候,可以通過(guò)一條SQL語(yǔ)句執(zhí)行而不是循環(huán)的一條一條插入,還有Redis的Pipeline操作等等。

那怎么來(lái)聚合呢,聚合策略是啥呢?聚合策略有兩個(gè)維度分別是聚合消息條數(shù)和聚合時(shí)間,比如聚合消息達(dá)到100條我們就往Kafka發(fā)送一次,這個(gè)條數(shù)是可以配置的,那如果一直也達(dá)不到100條消息怎么辦呢?通過(guò)聚合時(shí)間來(lái)兜底,這個(gè)聚合時(shí)間也是可以配置的,比如配置聚合時(shí)間為1秒鐘,也就是無(wú)論目前聚合了多少條消息只要聚合時(shí)間達(dá)到1秒,那么就往Kafka發(fā)送一次數(shù)據(jù)。聚合條數(shù)和聚合時(shí)間是或的關(guān)系,也就是只要有一個(gè)條件滿足就觸發(fā)。

在這里我們提供一個(gè)批量聚合數(shù)據(jù)的工具Batcher,定義如下

type Batcher struct {
  opts options
  Do func(ctx context.Context, val map[string][]interface{})
  Sharding func(key string) int
  chans []chan *msg
  wait  sync.WaitGroup
}

Do方法:滿足聚合條件后就會(huì)執(zhí)行Do方法,其中val參數(shù)為聚合后的數(shù)據(jù)

Sharding方法:通過(guò)Key進(jìn)行sharding,相同的key消息寫入到同一個(gè)channel中,被同一個(gè)goroutine處理

在merge方法中有兩個(gè)觸發(fā)執(zhí)行Do方法的條件,一是當(dāng)聚合的數(shù)據(jù)條數(shù)大于等于設(shè)置的條數(shù),二是當(dāng)觸發(fā)設(shè)置的定時(shí)器

代碼實(shí)現(xiàn)比較簡(jiǎn)單,如下為具體實(shí)現(xiàn):

type msg struct {
  key string
  val interface{}
}
type Batcher struct {
  opts options
  Do func(ctx context.Context, val map[string][]interface{})
  Sharding func(key string) int
  chans []chan *msg
  wait  sync.WaitGroup
}
func New(opts ...Option) *Batcher {
  b := &Batcher{}
  for _, opt := range opts {
 opt.apply(&b.opts)
  }
  b.opts.check()
  b.chans = make([]chan *msg, b.opts.worker)
  for i := 0; i < b.opts.worker; i++ {
 b.chans[i] = make(chan *msg, b.opts.buffer)
  }
  return b
}
func (b *Batcher) Start() {
  if b.Do == nil {
 log.Fatal("Batcher: Do func is nil")
  }
  if b.Sharding == nil {
 log.Fatal("Batcher: Sharding func is nil")
  }
  b.wait.Add(len(b.chans))
  for i, ch := range b.chans {
 go b.merge(i, ch)
  }
}
func (b *Batcher) Add(key string, val interface{}) error {
  ch, msg := b.add(key, val)
  select {
  case ch <- msg:
  default:
 return ErrFull
  }
  return nil
}
func (b *Batcher) add(key string, val interface{}) (chan *msg, *msg) {
  sharding := b.Sharding(key) % b.opts.worker
  ch := b.chans[sharding]
  msg := &msg{key: key, val: val}
  return ch, msg
}
func (b *Batcher) merge(idx int, ch <-chan *msg) {
  defer b.wait.Done()
  var (
 msg  *msg
 countint
 closed  bool
 lastTicker = true
 interval= b.opts.interval
 vals = make(map[string][]interface{}, b.opts.size)
  )
  if idx > 0 {
 interval = time.Duration(int64(idx) * (int64(b.opts.interval) / int64(b.opts.worker)))
  }
  ticker := time.NewTicker(interval)
  for {
 select {
 case msg = <-ch:
if msg == nil {
  closed = true
  break
}
count++
vals[msg.key] = append(vals[msg.key], msg.val)
if count >= b.opts.size {
  break
}
continue
 case <-ticker.C:
if lastTicker {
  ticker.Stop()
  ticker = time.NewTicker(b.opts.interval)
  lastTicker = false
}
 }
 if len(vals) > 0 {
ctx := context.Background()
b.Do(ctx, vals)
vals = make(map[string][]interface{}, b.opts.size)
count = 0
 }
 if closed {
ticker.Stop()
return
 }
  }
}
func (b *Batcher) Close() {
  for _, ch := range b.chans {
 ch <- nil
  }
  b.wait.Wait()
}

使用的時(shí)候需要先創(chuàng)建一個(gè)Batcher,然后定義Batcher的Sharding方法和Do方法,在Sharding方法中通過(guò)ProductID把不同商品的聚合投遞到不同的goroutine中處理,在Do方法中我們把聚合的數(shù)據(jù)一次性批量的發(fā)送到Kafka,定義如下:

b := batcher.New(
  batcher.WithSize(batcherSize),
  batcher.WithBuffer(batcherBuffer),
  batcher.WithWorker(batcherWorker),
  batcher.WithInterval(batcherInterval),
)
b.Sharding = func(key string) int {
  pid, _ := strconv.ParseInt(key, 10, 64)
  return int(pid) % batcherWorker
}
b.Do = func(ctx context.Context, val map[string][]interface{}) {
  var msgs []*KafkaData
  for _, vs := range val {
 for _, v := range vs {
msgs = append(msgs, v.(*KafkaData))
 }
  }
  kd, err := json.Marshal(msgs)
  if err != nil {
 logx.Errorf("Batcher.Do json.Marshal msgs: %v error: %v", msgs, err)
  }
  if err = s.svcCtx.KafkaPusher.Push(string(kd)); err != nil {
 logx.Errorf("KafkaPusher.Push kd: %s error: %v", string(kd), err)
  }
}
s.batcher = b
s.batcher.Start()

SeckillOrder方法中不再是每來(lái)一次請(qǐng)求就往Kafka中投遞一次消息,而是先通過(guò)batcher提供的Add方法添加到Batcher中等待滿足聚合條件后再往Kafka中投遞。

err = l.batcher.Add(strconv.FormatInt(in.ProductId, 10), &KafkaData{Uid: in.UserId, Pid: in.ProductId})
if err!= nil {
 logx.Errorf("l.batcher.Add uid: %d pid: %d error: %v", in.UserId, in.ProductId, err)
}

降低消息的消費(fèi)延遲

通過(guò)批量消息處理的思想,我們提供了Batcher工具,提升了性能,但這主要是針對(duì)生產(chǎn)端而言的。當(dāng)我們消費(fèi)到批量的數(shù)據(jù)后,還是需要串行的一條條的處理數(shù)據(jù),那有沒(méi)有辦法能加速消費(fèi)從而降低消費(fèi)消息的延遲呢?有兩種方案分別是:

  • 增加消費(fèi)者的數(shù)量
  • 在一個(gè)消費(fèi)者中增加消息處理的并行度

因?yàn)樵贙afka中,一個(gè)Topci可以配置多個(gè)Partition,數(shù)據(jù)會(huì)被平均或者按照生產(chǎn)者指定的方式寫入到多個(gè)分區(qū)中,那么在消費(fèi)的時(shí)候,Kafka約定一個(gè)分區(qū)只能被一個(gè)消費(fèi)者消費(fèi),為什么要這么設(shè)計(jì)呢?我理解的是如果有多個(gè)Consumer同時(shí)消費(fèi)一個(gè)分區(qū)的數(shù)據(jù),那么在操作這個(gè)消費(fèi)進(jìn)度的時(shí)候就需要加鎖,對(duì)性能影響比較大。所以說(shuō)當(dāng)消費(fèi)者數(shù)量小于分區(qū)數(shù)量的時(shí)候,我們可以增加消費(fèi)者的數(shù)量來(lái)增加消息處理能力,但當(dāng)消費(fèi)者數(shù)量大于分區(qū)的時(shí)候再繼續(xù)增加消費(fèi)者數(shù)量就沒(méi)有意義了。

不能增加Consumer的時(shí)候,可以在同一個(gè)Consumer中提升處理消息的并行度,即通過(guò)多個(gè)goroutine來(lái)并行的消費(fèi)數(shù)據(jù),我們一起來(lái)看看如何通過(guò)多個(gè)goroutine來(lái)消費(fèi)消息。

在Service中定義msgsChan,msgsChan為Slice,Slice的長(zhǎng)度表示有多少個(gè)goroutine并行的處理數(shù)據(jù),初始化如下:

func NewService(c config.Config) *Service {
  s := &Service{
 c: c,
 ProductRPC: product.NewProduct(zrpc.MustNewClient(c.ProductRPC)),
 OrderRPC:order.NewOrder(zrpc.MustNewClient(c.OrderRPC)),
 msgsChan:make([]chan *KafkaData, chanCount),
  }
  for i := 0; i < chanCount; i++ {
 ch := make(chan *KafkaData, bufferCount)
 s.msgsChan[i] = ch
 s.waiter.Add(1)
 go s.consume(ch)
  }
  return s
}

從Kafka中消費(fèi)到數(shù)據(jù)后,把數(shù)據(jù)投遞到Channel中,注意投遞消息的時(shí)候按照商品的id做Sharding,這能保證在同一個(gè)Consumer中對(duì)同一個(gè)商品的處理是串行的,串行的數(shù)據(jù)處理不會(huì)導(dǎo)致并發(fā)帶來(lái)的數(shù)據(jù)競(jìng)爭(zhēng)問(wèn)題

func (s *Service) Consume(_ string, value string) error {
  logx.Infof("Consume value: %s\n", value)
  var data []*KafkaData
  if err := json.Unmarshal([]byte(value), &data); err != nil {
 return err
  }
  for _, d := range data {
 s.msgsChan[d.Pid%chanCount] <- d
  }
  return nil
}

我們定義了chanCount個(gè)goroutine同時(shí)處理數(shù)據(jù),每個(gè)channel的長(zhǎng)度定義為bufferCount,并行處理數(shù)據(jù)的方法為consume,如下:

func (s *Service) consume(ch chan *KafkaData) {
  defer s.waiter.Done()
  for {
 m, ok := <-ch
 if !ok {
log.Fatal("seckill rmq exit")
 }
 fmt.Printf("consume msg: %+v\n", m)
 p, err := s.ProductRPC.Product(context.Background(), &product.ProductItemRequest{ProductId: m.Pid})
 if err != nil {
logx.Errorf("s.ProductRPC.Product pid: %d error: %v", m.Pid, err)
return
 }
 if p.Stock <= 0 {
logx.Errorf("stock is zero pid: %d", m.Pid)
return
 }
 _, err = s.OrderRPC.CreateOrder(context.Background(), &order.CreateOrderRequest{Uid: m.Uid, Pid: m.Pid})
 if err != nil {
logx.Errorf("CreateOrder uid: %d pid: %d error: %v", m.Uid, m.Pid, err)
return
 }
 _, err = s.ProductRPC.UpdateProductStock(context.Background(), &product.UpdateProductStockRequest{ProductId: m.Pid, Num: 1})
 if err != nil {
logx.Errorf("UpdateProductStock uid: %d pid: %d error: %v", m.Uid, m.Pid, err)
 }
  }
}

怎么保證不會(huì)超賣

當(dāng)秒殺活動(dòng)開(kāi)始后,大量用戶點(diǎn)擊商品詳情頁(yè)上的秒殺按鈕,會(huì)產(chǎn)生大量的并發(fā)請(qǐng)求查詢庫(kù)存,一旦某個(gè)請(qǐng)求查詢到有庫(kù)存,緊接著系統(tǒng)就會(huì)進(jìn)行庫(kù)存的扣減。然后,系統(tǒng)生成實(shí)際的訂單,并進(jìn)行后續(xù)的處理。如果請(qǐng)求查不到庫(kù)存,就會(huì)返回,用戶通常會(huì)繼續(xù)點(diǎn)擊秒殺按鈕,繼續(xù)查詢庫(kù)存。簡(jiǎn)單來(lái)說(shuō),這個(gè)階段的操作就是三個(gè):檢查庫(kù)存,庫(kù)存扣減、和訂單處理。因?yàn)槊總€(gè)秒殺請(qǐng)求都會(huì)查詢庫(kù)存,而請(qǐng)求只有查到庫(kù)存有余量后,后續(xù)的庫(kù)存扣減和訂單處理才會(huì)被執(zhí)行,所以,這個(gè)階段中最大的并發(fā)壓力都在庫(kù)存檢查操作上。

為了支撐大量高并發(fā)的庫(kù)存檢查請(qǐng)求,我們需要使用Redis單獨(dú)保存庫(kù)存量。那么,庫(kù)存扣減和訂單處理是否都可以交給Mysql來(lái)處理呢?其實(shí),訂單的處理是可以在數(shù)據(jù)庫(kù)中執(zhí)行的,但庫(kù)存扣減操作不能交給Mysql直接處理。因?yàn)榈搅藢?shí)際的訂單處理環(huán)節(jié),請(qǐng)求的壓力已經(jīng)不大了,數(shù)據(jù)庫(kù)完全可以支撐這些訂單處理請(qǐng)求。那為什么庫(kù)存扣減不能直接在數(shù)據(jù)庫(kù)中執(zhí)行呢?這是因?yàn)椋坏┱?qǐng)求查到有庫(kù)存,就意味著該請(qǐng)求獲得購(gòu)買資格,緊接著就會(huì)進(jìn)行下單操作,同時(shí)庫(kù)存量會(huì)減一,這個(gè)時(shí)候如果直接操作數(shù)據(jù)庫(kù)來(lái)扣減庫(kù)存可能就會(huì)導(dǎo)致超賣問(wèn)題。

直接操作數(shù)據(jù)庫(kù)扣減庫(kù)存為什么會(huì)導(dǎo)致超賣呢?由于數(shù)據(jù)庫(kù)的處理速度較慢,不能及時(shí)更新庫(kù)存余量,這就會(huì)導(dǎo)致大量的查詢庫(kù)存的請(qǐng)求讀取到舊的庫(kù)存值,并進(jìn)行下單,此時(shí)就會(huì)出現(xiàn)下單數(shù)量大于實(shí)際的庫(kù)存量,導(dǎo)致超賣。所以,就需要直接在Redis中進(jìn)行庫(kù)存扣減,具體的操作是,當(dāng)庫(kù)存檢查完后,一旦庫(kù)存有余量,我們就立即在Redis中扣減庫(kù)存,同時(shí),為了避免請(qǐng)求查詢到舊的庫(kù)存值,庫(kù)存檢查和庫(kù)存扣減這兩個(gè)操作需要保證原子性。

我們使用Redis的Hash來(lái)存儲(chǔ)庫(kù)存,total為總庫(kù)存,seckill為已秒殺的數(shù)量,為了保證查詢庫(kù)存和減庫(kù)存的原子性,我們使用Lua腳本進(jìn)行原子操作,讓秒殺量小于庫(kù)存的時(shí)候返回1,表示秒殺成功,否則返回0,表示秒殺失敗,代碼如下:

const (
  luaCheckAndUpdateScript = `
local counts = redis.call("HMGET", KEYS[1], "total", "seckill")
local total = tonumber(counts[1])
local seckill = tonumber(counts[2])
if seckill + 1 <= total then
  redis.call("HINCRBY", KEYS[1], "seckill", 1)
  return 1
end
return 0
`
)
func (l *CheckAndUpdateStockLogic) CheckAndUpdateStock(in *product.CheckAndUpdateStockRequest) (*product.CheckAndUpdateStockResponse, error) {
  val, err := l.svcCtx.BizRedis.EvalCtx(l.ctx, luaCheckAndUpdateScript, []string{stockKey(in.ProductId)})
  if err != nil {
 return nil, err
  }
  if val.(int64) == 0 {
 return nil, status.Errorf(codes.ResourceExhausted, fmt.Sprintf("insufficient stock: %d", in.ProductId))
  }
  return &product.CheckAndUpdateStockResponse{}, nil
}
func stockKey(pid int64) string {
  return fmt.Sprintf("stock:%d", pid)
}

對(duì)應(yīng)的seckill-rmq代碼修改如下:

func (s *Service) consume(ch chan *KafkaData) {
  defer s.waiter.Done()
  for {
 m, ok := <-ch
 if !ok {
log.Fatal("seckill rmq exit")
 }
 fmt.Printf("consume msg: %+v\n", m)
 _, err := s.ProductRPC.CheckAndUpdateStock(context.Background(), &product.CheckAndUpdateStockRequest{ProductId: m.Pid})
 if err != nil {
logx.Errorf("s.ProductRPC.CheckAndUpdateStock pid: %d error: %v", m.Pid, err)
return
 }
 _, err = s.OrderRPC.CreateOrder(context.Background(), &order.CreateOrderRequest{Uid: m.Uid, Pid: m.Pid})
 if err != nil {
logx.Errorf("CreateOrder uid: %d pid: %d error: %v", m.Uid, m.Pid, err)
return
 }
 _, err = s.ProductRPC.UpdateProductStock(context.Background(), &product.UpdateProductStockRequest{ProductId: m.Pid, Num: 1})
 if err != nil {
logx.Errorf("UpdateProductStock uid: %d pid: %d error: %v", m.Uid, m.Pid, err)
 }
  }
}

到這里,我們已經(jīng)了解了如何使用原子性的Lua腳本來(lái)實(shí)現(xiàn)庫(kù)存的檢查和扣減。其實(shí)要想保證庫(kù)存檢查和扣減的原子性,還有另外一種方法,那就是使用分布式鎖。

分布式鎖的實(shí)現(xiàn)方式有很多種,可以基于Redis、Etcd等等,用Redis實(shí)現(xiàn)分布式鎖的文章比較多,感興趣的可以自行搜索參考。這里給大家簡(jiǎn)單介紹下基于Etcd來(lái)實(shí)現(xiàn)分布式鎖。為了簡(jiǎn)化分布式鎖、分布式選舉、分布式事務(wù)的實(shí)現(xiàn),etcd社區(qū)提供了一個(gè)名為concurrency的包來(lái)幫助我們更簡(jiǎn)單、正確的使用分布式鎖。它的實(shí)現(xiàn)非常簡(jiǎn)單,主要流程如下:

  • 首先通過(guò)concurrency.NewSession方法創(chuàng)建Session,本質(zhì)上是創(chuàng)建了一個(gè)TTL為10的Lease
  • 得到Session對(duì)象后,通過(guò)concurrency.NewMutex創(chuàng)建一個(gè)mutex對(duì)象,包括了Lease、key prefix等信息
  • 然后聽(tīng)過(guò)mutex對(duì)象的Lock方法嘗試獲取鎖
  • 最后通過(guò)mutex對(duì)象的Unlock方法釋放鎖
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
session, err := concurrency.NewSession(cli, concurrency.WithTTL(10))
if err != nil {
log.Fatal(err)
}
defer session.Close()
mux := concurrency.NewMutex(session, "lock")
if err := mux.Lock(context.Background()); err != nil {
log.Fatal(err)
}
if err := mux.Unlock(context.Background()); err != nil {
log.Fatal(err)
}

結(jié)束語(yǔ)

本篇文章主要是針對(duì)秒殺功能繼續(xù)做了一些優(yōu)化。在Kafka消息的生產(chǎn)端做了批量消息聚合發(fā)送的優(yōu)化,Batch思想在實(shí)際生產(chǎn)開(kāi)發(fā)中使用非常多,希望大家能夠活靈活用,在消息的消費(fèi)端通過(guò)增加并行度來(lái)提升吞吐能力,這也是提升性能常用的優(yōu)化手段。最后介紹了可能導(dǎo)致超賣的原因,以及給出了相對(duì)應(yīng)的解決方案。同時(shí),介紹了基于Etcd的分布式鎖,在分布式服務(wù)中經(jīng)常出現(xiàn)數(shù)據(jù)競(jìng)爭(zhēng)的問(wèn)題,一般可以通過(guò)分布式鎖來(lái)解決,但分布式鎖的引入勢(shì)必會(huì)導(dǎo)致性能的下降,所以,還需要結(jié)合實(shí)際情況考慮是否需要引入分布式鎖。

代碼倉(cāng)庫(kù):https://github.com/zhoushuguang/lebron

項(xiàng)目地址https://github.com/zeromicro/go-zero

以上就是go zero微服務(wù)實(shí)戰(zhàn)性能優(yōu)化極致秒殺的詳細(xì)內(nèi)容,更多關(guān)于go zero微服務(wù)秒殺優(yōu)化的資料請(qǐng)關(guān)注本站其它相關(guān)文章!

國(guó)外服務(wù)器租用

版權(quán)聲明:本站文章來(lái)源標(biāo)注為YINGSOO的內(nèi)容版權(quán)均為本站所有,歡迎引用、轉(zhuǎn)載,請(qǐng)保持原文完整并注明來(lái)源及原文鏈接。禁止復(fù)制或仿造本網(wǎng)站,禁止在非www.sddonglingsh.com所屬的服務(wù)器上建立鏡像,否則將依法追究法律責(zé)任。本站部分內(nèi)容來(lái)源于網(wǎng)友推薦、互聯(lián)網(wǎng)收集整理而來(lái),僅供學(xué)習(xí)參考,不代表本站立場(chǎng),如有內(nèi)容涉嫌侵權(quán),請(qǐng)聯(lián)系alex-e#qq.com處理。

相關(guān)文章

實(shí)時(shí)開(kāi)通

自選配置、實(shí)時(shí)開(kāi)通

免備案

全球線路精選!

全天候客戶服務(wù)

7x24全年不間斷在線

專屬顧問(wèn)服務(wù)

1對(duì)1客戶咨詢顧問(wèn)

在線
客服

在線客服:7*24小時(shí)在線

客服
熱線

400-630-3752
7*24小時(shí)客服服務(wù)熱線

關(guān)注
微信

關(guān)注官方微信
頂部