使用go實現(xiàn)一個超級mini的消息隊列的示例代碼
趁著有空余時間,就想著擼一個mini的生產(chǎn)-消費消息隊列,說干就干了。自己是個javer,這次實現(xiàn),特意換用了go。沒錯,是零基礎(chǔ)上手go,順便可以學(xué)學(xué)go。
前置知識:
- go基本語法
- 消息隊列概念,也就三個:生產(chǎn)者、消費者、隊列
目的
- 沒想著實現(xiàn)多復(fù)雜,因為時間有限,就mini就好,mini到什么程度呢
- 使用雙向鏈表數(shù)據(jù)結(jié)構(gòu)作為隊列
- 有多個topic可供生產(chǎn)者生成消息和消費者消費消息
- 支持生產(chǎn)者并發(fā)寫
- 支持消費者讀,且ok后,從隊列刪除
- 消息不丟失(持久化)
- 高性能(先這樣想)
設(shè)計
整體架構(gòu)
協(xié)議
通訊協(xié)議底層使用tcp,mq是基于tcp自定義了一個協(xié)議,協(xié)議如下
type Msg struct { Id int64 TopicLen int64 Topic string // 1-consumer 2-producer 3-comsumer-ack 4-error MsgType int64 // 消息類型 Len int64 // 消息長度 Payload []byte // 消息 }
Payload使用字節(jié)數(shù)組,是因為不管數(shù)據(jù)是什么,只當(dāng)做字節(jié)數(shù)組來處理即可。Msg承載著生產(chǎn)者生產(chǎn)的消息,消費者消費的消息,ACK、和錯誤消息,前兩者會有負(fù)載,而后兩者負(fù)載和長度都為空
協(xié)議的編解碼處理,就是對字節(jié)的處理,接下來有從字節(jié)轉(zhuǎn)為Msg,和從Msg轉(zhuǎn)為字節(jié)兩個函數(shù)
func BytesToMsg(reader io.Reader) Msg { m := Msg{} var buf [128]byte n, err := reader.Read(buf[:]) if err != nil { fmt.Println("read failed, err:", err) } fmt.Println("read bytes:", n) // id buff := bytes.NewBuffer(buf[0:8]) binary.Read(buff, binary.LittleEndian, &m.Id) // topiclen buff = bytes.NewBuffer(buf[8:16]) binary.Read(buff, binary.LittleEndian, &m.TopicLen) // topic msgLastIndex := 16 + m.TopicLen m.Topic = string(buf[16: msgLastIndex]) // msgtype buff = bytes.NewBuffer(buf[msgLastIndex : msgLastIndex + 8]) binary.Read(buff, binary.LittleEndian, &m.MsgType) buff = bytes.NewBuffer(buf[msgLastIndex : msgLastIndex + 16]) binary.Read(buff, binary.LittleEndian, &m.Len) if m.Len <= 0 { return m } m.Payload = buf[msgLastIndex + 16:] return m } func MsgToBytes(msg Msg) []byte { msg.TopicLen = int64(len([]byte(msg.Topic))) msg.Len = int64(len([]byte(msg.Payload))) var data []byte buf := bytes.NewBuffer([]byte{}) binary.Write(buf, binary.LittleEndian, msg.Id) data = append(data, buf.Bytes()...) buf = bytes.NewBuffer([]byte{}) binary.Write(buf, binary.LittleEndian, msg.TopicLen) data = append(data, buf.Bytes()...) data = append(data, []byte(msg.Topic)...) buf = bytes.NewBuffer([]byte{}) binary.Write(buf, binary.LittleEndian, msg.MsgType) data = append(data, buf.Bytes()...) buf = bytes.NewBuffer([]byte{}) binary.Write(buf, binary.LittleEndian, msg.Len) data = append(data, buf.Bytes()...) data = append(data, []byte(msg.Payload)...) return data }
隊列
使用container/list,實現(xiàn)先入先出,生產(chǎn)者在隊尾寫,消費者在隊頭讀取
package broker import ( "container/list" "sync" ) type Queue struct { len int data list.List } var lock sync.Mutex func (queue *Queue) offer(msg Msg) { queue.data.PushBack(msg) queue.len = queue.data.Len() } func (queue *Queue) poll() Msg{ if queue.len == 0 { return Msg{} } msg := queue.data.Front() return msg.Value.(Msg) } func (queue *Queue) delete(id int64) { lock.Lock() for msg := queue.data.Front(); msg != nil; msg = msg.Next() { if msg.Value.(Msg).Id == id { queue.data.Remove(msg) queue.len = queue.data.Len() break } } lock.Unlock() }
方法offer往隊列里插入數(shù)據(jù),poll從隊列頭讀取數(shù)據(jù)素,delete根據(jù)消息ID從隊列刪除數(shù)據(jù)。這里使用Queue結(jié)構(gòu)體對List進行封裝,其實是有必要的,List作為底層的數(shù)據(jù)結(jié)構(gòu),我們希望隱藏更多的底層操作,只給客戶提供基本的操作
delete操作是在消費者消費成功且發(fā)送ACK后,對消息從隊列里移除的,因為消費者可以多個同時消費,所以這里進入臨界區(qū)時加鎖(em,加鎖是否就一定會影響對性能有較大的影響呢)
broker
broker作為服務(wù)器角色,負(fù)責(zé)接收連接,接收和響應(yīng)請求
package broker import ( "bufio" "net" "os" "sync" "time" ) var topics = sync.Map{} func handleErr(conn net.Conn) { defer func() { if err := recover(); err != nil { println(err.(string)) conn.Write(MsgToBytes(Msg{MsgType: 4})) } }() } func Process(conn net.Conn) { handleErr(conn) reader := bufio.NewReader(conn) msg := BytesToMsg(reader) queue, ok := topics.Load(msg.Topic) var res Msg if msg.MsgType == 1 { // comsumer if queue == nil || queue.(*Queue).len == 0{ return } msg = queue.(*Queue).poll() msg.MsgType = 1 res = msg } else if msg.MsgType == 2 { // producer if ! ok { queue = &Queue{} queue.(*Queue).data.Init() topics.Store(msg.Topic, queue) } queue.(*Queue).offer(msg) res = Msg{Id: msg.Id, MsgType: 2} } else if msg.MsgType == 3 { // consumer ack if queue == nil { return } queue.(*Queue).delete(msg.Id) } conn.Write(MsgToBytes(res)) }
MsgType等于1時,直接消費消息;MsgType等于2時是生產(chǎn)者生產(chǎn)消息,如果隊列為空,那么還需創(chuàng)建一個新的隊列,放在對應(yīng)的topic下;MsgType等于3時,代表消費者成功消費,可以
刪除消息
我們說消息不丟失,這里實現(xiàn)不完全,我就實現(xiàn)了持久化(持久化也沒全部實現(xiàn))。思路就是該topic對應(yīng)的隊列里的消息,按協(xié)議格式進行序列化,當(dāng)broker啟動時,從文件恢復(fù)
持久化需要考慮的是增量還是全量,需要保存多久,這些都會影響實現(xiàn)的難度和性能(想想Kafka和Redis的持久化),這里表示簡單實現(xiàn)就好:定時器定時保存
func Save() { ticker := time.NewTicker(60) for { select { case <-ticker.C: topics.Range(func(key, value interface{}) bool { if value == nil { return false } file, _ := os.Open(key.(string)) if file == nil { file, _ = os.Create(key.(string)) } for msg := value.(*Queue).data.Front(); msg != nil; msg = msg.Next() { file.Write(MsgToBytes(msg.Value.(Msg))) } _ := file.Close() return false }) default: time.Sleep(1) } } }
有一個問題是,當(dāng)上面的delete操作時,這里的file文件需不需要跟著delete掉對應(yīng)的消息?答案是需要刪除的,如果不刪除,只能等下一次的全量持久化來覆蓋了,中間就有臟數(shù)據(jù)問題
下面是啟動邏輯
package main import ( "awesomeProject/broker" "fmt" "net" ) func main() { listen, err := net.Listen("tcp", "127.0.0.1:12345") if err != nil { fmt.Print("listen failed, err:", err) return } go broker.Save() for { conn, err := listen.Accept() if err != nil { fmt.Print("accept failed, err:", err) continue } go broker.Process(conn) } }
生產(chǎn)者
package main import ( "awesomeProject/broker" "fmt" "net" ) func produce() { conn, err := net.Dial("tcp", "127.0.0.1:12345") if err != nil { fmt.Print("connect failed, err:", err) } defer conn.Close() msg := broker.Msg{Id: 1102, Topic: "topic-test", MsgType: 2, Payload: []byte("我")} n, err := conn.Write(broker.MsgToBytes(msg)) if err != nil { fmt.Print("write failed, err:", err) } fmt.Print(n) }
消費者
package main import ( "awesomeProject/broker" "bytes" "fmt" "net" ) func comsume() { conn, err := net.Dial("tcp", "127.0.0.1:12345") if err != nil { fmt.Print("connect failed, err:", err) } defer conn.Close() msg := broker.Msg{Topic: "topic-test", MsgType: 1} n, err := conn.Write(broker.MsgToBytes(msg)) if err != nil { fmt.Println("write failed, err:", err) } fmt.Println("n", n) var res [128]byte conn.Read(res[:]) buf := bytes.NewBuffer(res[:]) receMsg := broker.BytesToMsg(buf) fmt.Print(receMsg) // ack conn, _ = net.Dial("tcp", "127.0.0.1:12345") l, e := conn.Write(broker.MsgToBytes(broker.Msg{Id: receMsg.Id, Topic: receMsg.Topic, MsgType: 3})) if e != nil { fmt.Println("write failed, err:", err) } fmt.Println("l:", l) }
消費者這里ack時重新創(chuàng)建了連接,如果不創(chuàng)建連接的話,那服務(wù)端那里就需要一直從conn讀取數(shù)據(jù),直到結(jié)束。思考一下,像RabbitMQ的ack就有自動和手工的ack,如果是手工的ack,必然需要一個新的連接,因為不知道客戶端什么時候發(fā)送ack,自動的話,當(dāng)然可以使用同一個連接,but這里就簡單創(chuàng)建一條新連接吧
啟動
先啟動broker,再啟動producer,然后啟動comsumer,OK,能跑,能實現(xiàn)發(fā)送消息到隊列,從隊列消費消息
總結(jié)
整體雖然簡單,但畢竟是使用go實現(xiàn)的,就是看似一頓操作猛如虎,實質(zhì)慌如狗。第一時間就被go的gopath和go mod困擾住,后面語法的使用,比如指針,傳值傳引用等,最頭疼的就是類型轉(zhuǎn)換,作為一個javer,使用go進行類型轉(zhuǎn)換,著實被狠狠得虐了一番。
到此這篇關(guān)于使用go實現(xiàn)一個超級mini的消息隊列的示例代碼的文章就介紹到這了,更多相關(guān)go mini消息隊列內(nèi)容請搜索本站以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持本站!?
版權(quán)聲明:本站文章來源標(biāo)注為YINGSOO的內(nèi)容版權(quán)均為本站所有,歡迎引用、轉(zhuǎn)載,請保持原文完整并注明來源及原文鏈接。禁止復(fù)制或仿造本網(wǎng)站,禁止在非www.sddonglingsh.com所屬的服務(wù)器上建立鏡像,否則將依法追究法律責(zé)任。本站部分內(nèi)容來源于網(wǎng)友推薦、互聯(lián)網(wǎng)收集整理而來,僅供學(xué)習(xí)參考,不代表本站立場,如有內(nèi)容涉嫌侵權(quán),請聯(lián)系alex-e#qq.com處理。