共计 9128 个字符,预计需要花费 23 分钟才能阅读完成。
协定
本篇咱们先来实现一下协定,所谓的协定,说白了就是规定了消费者的行为,并将这些行为转化成对 topic、channel 或者 message 的操作。例如,客户端发送 SUB order pay
,咱们就创立一个名为 order 的 topic,再在 topic 上面创立一个名为 pay 的 channel,最初将该客户端与该 channel 绑定,后续该客户端就能接管到生产者的音讯了。
现阶段咱们筹备实现四种协定,别离是 SUB(订阅)、GET(读取)、FIN(实现)和 REQ(重入),有需要的话前期再加。因为这种不确定性,咱们能够在运行时再确定咱们要调用的办法,这样能够保障不必改变咱们的外围代码。所以咱们在这里应用 Go 语言的反射机制来实现协定的外围代码,等到前期协定成熟了之后,咱们能够用 switch 重构外围,毕竟反射的性能还是比拟差的。
咱们先来定义一些客户端的谬误以及须要用到的常量和接口等等,对立放在 protocol/client_error 文件下了:
package protocol | |
import ("io") | |
const ( | |
ClientInit = iota | |
ClientWaitGet | |
ClientWaitResponse | |
) | |
type StatefulReadWriter interface { | |
io.ReadWriter | |
GetState() int | |
SetState(state int) | |
String() string} | |
type ClientError struct {errStr string} | |
func (e ClientError) Error() string {return e.errStr} | |
var (ClientErrInvalid = ClientError{"E_INVALID"} | |
ClientErrBadTopic = ClientError{"E_BAD_TOPIC"} | |
ClientErrBadChannel = ClientError{"E_BAD_CHANNEL"} | |
ClientErrBadMessage = ClientError{"E_BAD_MESSAGE"} | |
) |
执行代码
咱们来看一下协定的执行代码:
type Protocol struct {channel *message.Channel} | |
func (p *Protocol) IOLoop(client StatefulReadWriter) error { | |
var ( | |
err error | |
line string | |
resp []byte) | |
client.SetState(ClientInit) | |
reader := bufio.NewReader(client) | |
for {line, err = reader.ReadString('\n') | |
if err != nil {break} | |
line = strings.Replace(line, "\n", "", -1) | |
line = strings.Replace(line, "\r", "", -1) | |
params := strings.Split(line, " ") | |
log.Printf("PROTOCOL: %#v", params) | |
resp, err = p.Execute(client, params...) | |
if err != nil {_, err = client.Write([]byte(err.Error())) | |
if err != nil {break} | |
continue | |
} | |
if resp != nil {_, err = client.Write(resp) | |
if err != nil {break} | |
} | |
} | |
return err | |
} | |
// Execute use reflection to call the appropriate method for this command | |
func (p *Protocol) Execute(client StatefulReadWriter, params ...string) ([]byte, error) { | |
var ( | |
err error | |
resp []byte) | |
typ := reflect.TypeOf(p) | |
args := make([]reflect.Value, 3) | |
args[0] = reflect.ValueOf(p) | |
args[1] = reflect.ValueOf(client) | |
cmd := strings.ToUpper(params[0]) | |
if method, ok := typ.MethodByName(cmd); ok {args[2] = reflect.ValueOf(params) | |
returnValues := method.Func.Call(args) | |
if !returnValues[0].IsNil() {resp = returnValues[0].Interface().([]byte) | |
} | |
if !returnValues[1].IsNil() {err = returnValues[1].Interface().(error) | |
} | |
return resp, err | |
} | |
return nil, ClientErrInvalid | |
} |
IOLoop 办法比较简单,就是循环从客户端逐行读取输出,简略地过滤后,拆分成各个参数传递给 Execute 办法,而后将后果往客户端写入。因而,Execute 办法才是真正的外围代码,它的思路就是以传入的 params 的第一项作为办法名,判断有无实现该函数并执行发射调用。例如客户端发送 SUB order pay
,就是先判断 &Protocol 有没有实现 SUB 办法,有的话就将 client 和 params 数组一起作为参数传给 SUB 办法执行调用,并返回调用后果。
有的同学可能会好奇为什么在 client 之前又加了个参数 p,也就是办法实现者自身。这是因为在 Go 语言中,类型 A 的办法实质上就是一个以 A 为第一个参数的函数,所以咱们反射调用的时候必须将 &Protocol 作为第一个参数。
具体实现
SUB
获取 topic,再获取 channel,最初绑定客户端连贯和 channel。
func (p *Protocol) SUB(client StatefulReadWriter, params []string) ([]byte, error) {if client.GetState() != ClientInit {return nil, ClientErrInvalid} | |
if len(params) < 3 {return nil, ClientErrInvalid} | |
topicName := params[1] | |
if len(topicName) == 0 {return nil, ClientErrBadTopic} | |
channelName := params[2] | |
if len(channelName) == 0 {return nil, ClientErrBadChannel} | |
client.SetState(ClientWaitGet) | |
topic := message.GetTopic(topicName) | |
p.channel = topic.GetChannel(channelName) | |
return nil, nil | |
} |
GET
向绑定的 channel 发送音讯,而后批改状态,就是这么简略。
func (p *Protocol) GET(client StatefulReadWriter, params []string) ([]byte, error) {if client.GetState() != ClientWaitGet {return nil, ClientErrInvalid} | |
msg := p.channel.PullMessage() | |
if msg == nil {log.Printf("ERROR: msg == nil") | |
return nil, ClientErrBadMessage | |
} | |
uuidStr := util.UuidToStr(msg.Uuid()) | |
log.Printf("PROTOCOL: writing msg(%s) to client(%s) - %s", uuidStr, client.String(), string(msg.Body())) | |
client.SetState(ClientWaitResponse) | |
return msg.Data(), nil} |
FIN
解释都算多余 …
func (p *Protocol) FIN(client StatefulReadWriter, params []string) ([]byte, error) {if client.GetState() != ClientWaitResponse {return nil, ClientErrInvalid} | |
if len(params) < 2 {return nil, ClientErrInvalid} | |
uuidStr := params[1] | |
err := p.channel.FinishMessage(uuidStr) | |
if err != nil {return nil, err} | |
client.SetState(ClientWaitGet) | |
return nil, nil | |
} |
REQ
同样也不必解释。
func (p *Protocol) REQ(client StatefulReadWriter, params []string) ([]byte, error) {if client.GetState() != ClientWaitResponse {return nil, ClientErrInvalid} | |
if len(params) < 2 {return nil, ClientErrInvalid} | |
uuidStr := params[1] | |
err := p.channel.RequeueMessage(uuidStr) | |
if err != nil {return nil, err} | |
client.SetState(ClientWaitGet) | |
return nil, nil | |
} |
最初别忘了在 client.go 中加一个办法,将咱们的协定用起来:
// Handle reads data from the client, keeps state, and responds. | |
func (c *Client) Handle() {defer c.Close() | |
proto := &protocol.Protocol{} | |
err := proto.IOLoop(c) | |
if err != nil {log.Printf("ERROR: client(%s) - %s", c.String(), err.Error()) | |
return | |
} | |
} |
有仔细的同学可能发现了,咱们是在协定中关联了 channel,在 channel 构造中保护的消费者数组仿佛并没有用到。这个没有关系,因为咱们设计的这个音讯队列实现的是“拉”模式,是由消费者被动拉取数据。如果咱们要扩大“推”模式,要向消费者被动推送数据的话,channel 中保护的消费者就能够派上用场了。
后盾队列
咱们之前在实现 channel 和 topic 时,会应用一个有缓冲管道 msgChan 作为内存队列暂存音讯,缓冲区满的时候就抉择抛弃音讯,当初咱们就来实现一个长久化的队列来保障这些音讯不失落。
接口和字段定义
老规矩,还是定义一个后盾队列的接口:queue/backend_queue.go
type Queue interface {Get() ([]byte, error) | |
Put([]byte) error | |
ReadReadyChan() chan struct{} | |
Close() error} |
工夫无限,这里只提供一个磁盘队列 DiskQueue 的实现,字段定义如下:
const maxFileSize = 1024 * 1024 * 100 | |
type DiskQueue struct { | |
name string | |
readPos int64 | |
writePos int64 | |
readFileNum int64 | |
writeFileNum int64 | |
readFile *os.File | |
writeFile *os.File | |
readChan chan struct{} | |
inChan chan util.ChanReq | |
outChan chan util.ChanRet | |
exitChan chan util.ChanReq | |
} |
后面几个字段就是读写的文件以及地位信息,前面的几个管道用于收发音讯,同时限度了单个文件的大小下限为 100 MB。
辅助函数
辅助函数分为三类:
- 长久化和重载元数据
- 生成和获取文件名
- 判断是否有数据可读
注:当读写的文件超过单个文件大小下限或者敞开队列时,咱们须要将读写的文件名和地位信息保留下来,不便下次启动时从新定位持续读写,须要保留的文件名和读写地位就称之为元数据。
func (d *DiskQueue) persistMetaData() (err error) {metaFileName := d.metaDataFileName() | |
tmpFileName := metaFileName + ".tmp" | |
f, err := os.OpenFile(tmpFileName, os.O_RDWR|os.O_CREATE, 0600) | |
if err != nil {return} | |
_, err = fmt.Fprintf(f, "%d,%d\n%d,%d\n", d.readFileNum, d.readPos, d.writeFileNum, d.writePos) | |
if err != nil {f.Close() | |
return | |
} | |
f.Close() | |
log.Printf("DISK: persisted meta data for (%s) - readFileNum=%d writeFileNum=%d readPos=%d writePos=%d", | |
d.name, d.readFileNum, d.writeFileNum, d.readPos, d.writePos) | |
return os.Rename(tmpFileName, metaFileName) | |
} | |
func (d *DiskQueue) retrieveMetaData() (err error) {metaFileName := d.metaDataFileName() | |
f, err := os.OpenFile(metaFileName, os.O_RDONLY, 0600) | |
if err != nil {return} | |
defer f.Close() | |
_, err = fmt.Fscanf(f, "%d,%d\n%d,%d\n", &d.readFileNum, &d.readPos, &d.writeFileNum, &d.writePos) | |
if err != nil {return} | |
log.Printf("DISK: retrieved meta data for (%s) - readFileNum=%d writeFileNum=%d readPos=%d writePos=%d", | |
d.name, d.readFileNum, d.writeFileNum, d.readPos, d.writePos) | |
return | |
} | |
func (d *DiskQueue) metaDataFileName() string {return fmt.Sprintf("%s.diskqueue.meta.dat", d.name) | |
} | |
func (d *DiskQueue) fileName(fileNum int64) string {return fmt.Sprintf("%s.diskqueue.%06d.dat", d.name, fileNum) | |
} | |
func (d *DiskQueue) hasDataToRead() bool {return (d.writeFileNum > d.readFileNum) || (d.writePos > d.readPos) | |
} |
读写音讯
读写音讯的逻辑就是对文件的读写,没有什么浏览难度,这里就不一一介绍了:
func (d *DiskQueue) readOne() ([]byte, error) { | |
var ( | |
err error | |
msgSize int32 | |
) | |
if d.readPos > maxFileSize { | |
d.readFileNum++ | |
d.readPos = 0 | |
d.readFile.Close() | |
d.readFile = nil | |
if err = d.persistMetaData(); err != nil {return nil, err} | |
} | |
if d.readFile == nil {d.readFile, err = os.OpenFile(d.fileName(d.readFileNum), os.O_RDONLY, 0600) | |
if err != nil {return nil, err} | |
if d.readPos > 0 {_, err = d.readFile.Seek(d.readPos, 0) | |
if err != nil {return nil, err} | |
} | |
} | |
err = binary.Read(d.readFile, binary.BigEndian, &msgSize) | |
if err != nil {d.readFile.Close() | |
d.readFile = nil | |
return nil, err | |
} | |
readBuf := make([]byte, msgSize) | |
_, err = d.readFile.Read(readBuf) | |
if err != nil {return nil, err} | |
d.readPos += int64(msgSize + 4) | |
return readBuf, nil | |
} | |
func (d *DiskQueue) writeOne(msg []byte) (err error) { | |
var buf bytes.Buffer | |
if d.writePos > maxFileSize { | |
d.writeFileNum++ | |
d.writePos = 0 | |
d.writeFile.Close() | |
d.writeFile = nil | |
if err = d.persistMetaData(); err != nil {return} | |
} | |
if d.writeFile == nil {d.writeFile, err = os.OpenFile(d.fileName(d.writeFileNum), os.O_RDWR|os.O_CREATE, 0600) | |
if err != nil {return} | |
if d.writePos > 0 {_, err = d.writeFile.Seek(d.writePos, 0) | |
if err != nil {return} | |
} | |
} | |
dataLen := len(msg) | |
err = binary.Write(&buf, binary.BigEndian, dataLen) | |
if err != nil {return} | |
_, err = buf.Write(msg) | |
if err != nil {return} | |
_, err = d.writeFile.Write(buf.Bytes()) | |
if err != nil {d.writeFile.Close() | |
d.writeFile = nil | |
return | |
} | |
d.writePos += int64(dataLen + 4) | |
return | |
} |
事件调度
后盾队列的事件调度和 channel、topic 的逻辑一模一样,就是裸露的管道操作方法加上后盾的 for + select 监听组合:
func (d *DiskQueue) Get() ([]byte, error) { | |
ret := <-d.outChan | |
return ret.Variable.([]byte), ret.Err | |
} | |
func (d *DiskQueue) Put(bytes []byte) error {errChan := make(chan interface{}) | |
d.inChan <- util.ChanReq{ | |
Variable: bytes, | |
RetChan: errChan, | |
} | |
err, _ := (<-errChan).(error) | |
return err | |
} | |
func (d *DiskQueue) ReadReadyChan() chan struct{} {return d.readChan} | |
func (d *DiskQueue) Close() error {errChan := make(chan interface{}) | |
d.exitChan <- util.ChanReq{RetChan: errChan,} | |
err, _ := (<-errChan).(error) | |
return err | |
} | |
func (d *DiskQueue) router() { | |
for {if d.hasDataToRead() { | |
select { | |
// in order to read only when we actually want a message | |
case d.readChan <- struct{}{}: | |
msg, err := d.readOne() | |
d.outChan <- util.ChanRet{ | |
Err: err, | |
Variable: msg, | |
} | |
case writeRequest := <-d.inChan: | |
err := d.writeOne(writeRequest.Variable.([]byte)) | |
writeRequest.RetChan <- err | |
case closeReq := <-d.exitChan: | |
if d.readFile != nil {d.readFile.Close() | |
} | |
if d.writeFile != nil {d.writeFile.Close() | |
} | |
closeReq.RetChan <- d.persistMetaData() | |
return | |
} | |
} else { | |
select { | |
case writeRequest := <-d.inChan: | |
err := d.writeOne(writeRequest.Variable.([]byte)) | |
writeRequest.RetChan <- err | |
case closeReq := <-d.exitChan: | |
if d.readFile != nil {d.readFile.Close() | |
} | |
if d.writeFile != nil {d.writeFile.Close() | |
} | |
closeReq.RetChan <- d.persistMetaData() | |
return | |
} | |
} | |
} | |
} |
Get 办法就是读取音讯,Put 是发送音讯,Close 是发送敞开信号。值得一提的是 ReadReadyChan 办法,它返回的是 readChan 这个管道,那么这个管道有什么用?答案是 readChan 是为了确保当真正有读取后盾队列需要的时候才往 outChan 发送数据,毕竟咱们读取数据应该用拉模式,而不是一旦有数据可读就发送的推模式。
使用后盾队列
咱们的后盾队列设计好后,就能够在之前 channel 和 topic 的设计中学以致用防止失落数据的问题,次要批改的中央有三处:
- 接管音讯时,当 msgChan 缓冲区已满,写入后盾队列
- 推送音讯时,当 msgChan 无数据但后盾队列有数据时,读取队列组装音讯发送
- 敞开 channel 和 topic 的时候,一并将后盾队列敞开
须要改变的中央不多,这里就不贴代码了,本篇的残缺代码能够参考:SMQ。
到这里咱们的外围组件大抵设计实现了,下一篇咱们就来实现收尾工作。