go-micro V2 从零开始(三)消息的订阅和发布

本文相关代码:gitee

前言

上一章,我们已经完成了todolist系统的核心服务task-srv,能够顺利实现任务的增删改查等操作。

现在假设我们打算在task-srv服务的基础上,拓展更多丰富的功能,利如:

引入成就系统,用户每完成一项任务成就点+1,记录用户达成1,100,1000成就点的时间,给与相应成就 引入邮件服务,用户每完成一项任务将任务详情发送到用户指定邮箱 引入数据分析服务,用户每完成一项任务根据完成情况做数据统计

等等等等,你可以随意发挥自己的想象……

这个时候我们可以在task-srv服务中修改handler代码,调用上述新服务,为了性能我们甚至可以借助携程并发完成上述请求,但是这意味着每当我们有了新的点子,就必须修改task-srv服务,将接口调用硬编码到服务中,这种业务代码的耦合是我们受不了的。

幸好我们有消息队列,俗话说:“消息队列有三好,削峰、稳流和解耦”。传统面向接口编程是以接口为媒介,实现调用接口者和接口实现者之间的解耦,但是这种解耦程度不是很高,如果接口发生变化,双方代码都需要变动,而事件驱动则是调用者和被调用者互相不知道对方,两者只和中间消息队列耦合。

这一章,我们先改造task-srv服务,通过消息来实现成就系统的事件驱动,然后开发一个简单的成就系统achievement-srv(篇幅原因上面的那些点子就不一一实现了)

具体步骤

一、改造 task-srv

注意下面都是代码片段,不要整体复制

1.1 改造 proto

既然都加入成就系统了,我们顺便就加入多用户支持,修改task.proto文件,给task增加userId字段:

message Task { //每条任务的ID,本项目中对应mongodb记录的"_id"字段 //@inject_tag: bson:"_id" string id = 1; //任务主体文字 //@inject_tag: bson:"body" string body = 2; //用户设定的任务开始时间戳 //@inject_tag: bson:"startTime" int64 startTime = 3; //用户设定的任务截止时间戳 //@inject_tag: bson:"endTime" int64 endTime = 4; //任务是否已完成 //@inject_tag: bson:"isFinished" int32 isFinished = 5; //用户实际完成时间戳 //@inject_tag: bson:"finishTime" int64 finishTime = 6; //任务创建时间 //@inject_tag: bson:"createTime" int64 createTime = 7; //任务修改时间 //@inject_tag: bson:"updateTime" int64 updateTime = 8; //用户ID //@inject_tag: bson:"userId" string userId=9; }

本章内容我们暂时不创建user结构,留待后续讲到鉴权时再做

1.2 改造 repository

我们规定,在用户调用Finished接口时,向消息队列发送任务信息。

... type TaskRepository interface { InsertOne(ctx context.Context, task *pb.Task) error Delete(ctx context.Context, id string) error Modify(ctx context.Context, task *pb.Task) error Finished(ctx context.Context, task *pb.Task) error Count(ctx context.Context, keyword string) (int64, error) Search(ctx context.Context, req *pb.SearchRequest) ([]*pb.Task, error) // 接口新增方法 FindById(ctx context.Context, id string) (*pb.Task, error) } ... func (repo *TaskRepositoryImpl) InsertOne(ctx context.Context, task *pb.Task) error { _, err := repo.collection().InsertOne(ctx, bson.M{ "body": task.Body, "startTime":task.StartTime, "endTime":task.EndTime, "isFinished": UnFinished, "createTime": time.Now().Unix(), // 插入新任务时增加userId "userId": task.UserId, }) return err } // 通过ID查询task信息 func (repo *TaskRepositoryImpl) FindById(ctx context.Context, id string) (*pb.Task, error) { objectId, err := primitive.ObjectIDFromHex(id) if err != nil { return nil, errors.WithMessage(err, "parse ID") } result := repo.collection().FindOne(ctx, bson.M{"_id": objectId}) task := &pb.Task{} if err := result.Decode(task); err != nil { return nil, errors.WithMessage(err, "search mongo") } return task, nil } ...

1.3 改造 handler

修改/task-srv/handler/task.go:

... const ( // 任务完成消息的topic TaskFinishedTopic = "task.finished" ) type TaskHandler struct { TaskRepository repository.TaskRepository // 由go-micro封装,用于发送消息的接口,老版本叫micro.Publisher TaskFinishedPubEvent micro.Event } ... func (t *TaskHandler) Create(ctx context.Context, req *pb.Task, resp *pb.EditResponse) error { // 创建任务接口增加userId必填校验 if req.Body == "" || req.StartTime <= 0 || req.EndTime <= 0 || req.UserId == "" { return errors.New("bad param") } if err := t.TaskRepository.InsertOne(ctx, req); err != nil { return err } resp.Msg = "success" return nil } ... func (t *TaskHandler) Finished(ctx context.Context, req *pb.Task, resp *pb.EditResponse) error { if req.Id == "" || req.IsFinished != repository.UnFinished && req.IsFinished != repository.Finished { return errors.New("bad param") } if err := t.TaskRepository.Finished(ctx, req); err != nil { return err } resp.Msg = "success" // 发送task完成消息 // 由于以下都是主业务之外的增强功能,出现异常只记录日志,不影响主业务返回 if task, err := t.TaskRepository.FindById(ctx, req.Id); err != nil { log.Print("[error]cant send \"task finished\" message. ", err) } else { if err = t.TaskFinishedPubEvent.Publish(ctx, task); err != nil { log.Print("[error]cant send \"task finished\" message. ", err) } } return nil } ...

1.4 改造 main

修改/task-srv/main.go,在注册handler时,加入消息发送实例:

... taskHandler := &handler.TaskHandler{ TaskRepository: &repository.TaskRepositoryImpl{ Conn: conn, }, // 注入消息发送实例,为避免消息名冲突,这里的topic我们用服务名+自定义消息名拼出 TaskFinishedPubEvent: micro.NewEvent("go.micro.service.task."+handler.TaskFinishedTopic, service.Client()), } if err := pb.RegisterTaskServiceHandler(service.Server(), taskHandler); err != nil { log.Fatal(errors.WithMessage(err, "register server")) } ...

以上,便完成了Finished接口的改造,接下来我们要实现消息的消费。

二、编写成就服务 achievement-srv

2.1 创建目录

首先,我们在go-todolist目录下新建achievement-srv目录,他将是一个完全独立的项目结构:

> mkdir achievement-srv && cd achievement-srv > mkdir repository subscriber

这是一个完全由消息队列事件驱动的服务,因此我们创建了subscriber文件夹,而没有创建handler文件夹。 由于我们继续使用task.proto中定义的task作为消息传递对象,这里不需要再编写新的proto文件。 在这个项目中为了简单我们把task直接定义再task-srv项目中,实际开发中建议将这种多个服务共用的消息体定义在公共proto文件中。 此时go-todolist项目的完整结构如下图(根目录的编号2是为了和文章进度对应):

2.2 数据库操作

新建并编辑achievement-srv/repository/achievement.go

package repository import ( "context" "github.com/pkg/errors" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "time" ) const ( // 默认数据库名 DbName = "todolist" // 默认表名 TaskCollection = "achievement" ) // 数据库的成就记录表结构 type Achievement struct { Id string `bson:"_id,omitempty"` // 用户ID UserId string `bson:"userId"` // 完成任务总数 Total int64 `bson:"total"` // 完成第一个任务的时间 Finished1Time int64 `bson:"finished1Time"` // 完成第一百个任务的时间 Finished100Time int64 `bson:"finished100Time"` // 完成第一千个任务的时间 Finished1000Time int64 `bson:"finished1000Time"` // 更新时间 UpdateTime int64 `bson:"updateTime"` } // 因为只是演示,这里我们定义查询和保存 type AchievementRepo interface { FindByUserId(ctx context.Context, userId string) (*Achievement, error) Insert(ctx context.Context, achievement *Achievement) error Update(ctx context.Context, achievement *Achievement) error } type AchievementRepoImpl struct { Conn *mongo.Client } func (repo *AchievementRepoImpl) collection() *mongo.Collection { return repo.Conn.Database(DbName).Collection(TaskCollection) } func (repo *AchievementRepoImpl) FindByUserId(ctx context.Context, userId string) (*Achievement, error) { result := repo.collection().FindOne(ctx, bson.M{"userId": userId}) // findOne如果查不到是会报错的,这里要处理一下 if result.Err() == mongo.ErrNoDocuments { return nil, nil } achievement := &Achievement{} if err := result.Decode(achievement); err != nil { return nil, errors.WithMessage(err, "search mongo") } return achievement, nil } func (repo *AchievementRepoImpl) Insert(ctx context.Context, achievement *Achievement) error { _, err := repo.collection().InsertOne(ctx, achievement) return err } func (repo *AchievementRepoImpl) Update(ctx context.Context, achievement *Achievement) error { achievement.UpdateTime = time.Now().Unix() oid, err := primitive.ObjectIDFromHex(achievement.Id) if err != nil { return err } achievement.Id = "" _, err = repo.collection().UpdateOne(ctx, bson.M{"_id": oid}, bson.M{"$set": achievement}) return err }

我们首先定义了成就的数据库结构,然后是他的数据库方法,因为只是演示,这里我们只定义查询和保存方法。

2.3 业务实现

创建并编辑achievement-srv/subscriber/achievement.go,这里我们又可以参考以下之前的hello-srv官方demo写法

package subscriber import ( "context" "github.com/pkg/errors" "go-todolist/achievement-srv/repository" pb "go-todolist/task-srv/proto/task" "log" "strings" "time" ) // 定时实现类 type AchievementSub struct { Repo repository.AchievementRepo } // 只处理任务完成这一个事件 func (sub *AchievementSub) Finished(ctx context.Context, task *pb.Task) error { log.Printf("Handler Received message: %v\n", task) if task.UserId == "" || strings.TrimSpace(task.UserId) == "" { return errors.New("userId is blank") } entity, err := sub.Repo.FindByUserId(ctx, task.UserId) if err != nil { return err } now := time.Now().Unix() if entity == nil { entity = &repository.Achievement{ UserId:task.UserId, Total: 1, Finished1Time: now, } return sub.Repo.Insert(ctx, entity) } entity.Total++ switch entity.Total { case 100: entity.Finished100Time = now case 1000: entity.Finished1000Time = now } return sub.Repo.Update(ctx, entity) }

2.4 注册服务

创建并编辑achievement-srv/main.go

package main import ( "context" "github.com/micro/go-micro/v2" "github.com/pkg/errors" "go-todolist/achievement-srv/repository" "go-todolist/achievement-srv/subscriber" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "log" "time" ) // 这里是我内网的mongo地址,请根据你得实际情况配置,推荐使用dockers部署 const MONGO_URI = "mongodb://172.18.0.58:27017" // task-srv服务 func main() { // 在日志中打印文件路径,便于调试代码 log.SetFlags(log.Llongfile) conn, err := connectMongo(MONGO_URI, time.Second) if err != nil { log.Fatal(err) } defer conn.Disconnect(context.Background()) // New Service service := micro.NewService( micro.Name("go.micro.service.achievement"), micro.Version("latest"), ) // Initialise service service.Init() // Register Handler handler := &subscriber.AchievementSub{ Repo: &repository.AchievementRepoImpl{ Conn: conn, }, } // 这里的topic注意与task-srv注册的要一致 if err := micro.RegisterSubscriber("go.micro.service.task.finished", service.Server(), handler); err != nil { log.Fatal(errors.WithMessage(err, "subscribe")) } // Run service if err := service.Run(); err != nil { log.Fatal(errors.WithMessage(err, "run server")) } } // 连接到MongoDB func connectMongo(uri string, timeout time.Duration) (*mongo.Client, error) { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() client, err := mongo.Connect(ctx, options.Client().ApplyURI(uri)) if err != nil { return nil, errors.WithMessage(err, "create mongo connection session") } return client, nil }

三、运行并校验

3.1 运行

分别在两个命令行窗口执行如下命令:

> cd task-srv > go run main.go 2020-09-17 19:29:[email protected]/service.go:200 level=info Starting [service] go.micro.service.task 2020-09-17 19:29:54file=grpc/grpc.go:864 level=info Server [grpc] Listening on [::]:61401 2020-09-17 19:29:54file=grpc/grpc.go:697 level=info Registry [mdns] Registering node: go.micro.service.task-391e277c-9804-48b4-b17f-75bffdafcaef > cd achievement-srv > go run main.go 2020-09-17 19:28:[email protected]/service.go:200 level=info Starting [service] go.micro.service.achievement 2020-09-17 19:28:17file=grpc/grpc.go:864 level=info Server [grpc] Listening on [::]:61378 2020-09-17 19:28:17file=grpc/grpc.go:881 level=info Broker [http] Connected to 127.0.0.1:61379 2020-09-17 19:28:17file=grpc/grpc.go:697 level=info Registry [mdns] Registering node: go.micro.service.achievement-f3ac123c-e44c-4afa-87a7-a 2020-09-17 19:28:17file=grpc/grpc.go:730 level=info Subscribing to topic: go.micro.service.task.finished

这样两个服务就都启动了,可以看到我们的achievement-srv服务订阅了消息go.micro.service.task.finished。

3.2 校验

这里我们仍使用之前的task-cli.go调用task-srv,不过因为之前加入了userId的必填校验,这里我们稍微修改以下这个测试文件,随便写一个userId:

package main import ( "context" "github.com/micro/go-micro/v2" pb "go-todolist/task-srv/proto/task" "go-todolist/task-srv/repository" "log" "time" ) // 模拟client调用task-srv服务 func main() { // 在日志中打印文件路径,便于调试代码 log.SetFlags(log.Llongfile) // 客户端也注册为服务 server := micro.NewService(micro.Name("go.micro.client.task")) server.Init() taskService := pb.NewTaskService("go.micro.service.task", server.Client()) // 调用服务生成三条任务 now := time.Now() insertTask(taskService, "完成学习笔记(一)", now.Unix(), now.Add(time.Hour*24).Unix()) insertTask(taskService, "完成学习笔记(二)", now.Add(time.Hour*24).Unix(), now.Add(time.Hour*48).Unix()) insertTask(taskService, "完成学习笔记(三)", now.Add(time.Hour*48).Unix(), now.Add(time.Hour*72).Unix()) // 分页查询任务列表 page, err := taskService.Search(context.Background(), &pb.SearchRequest{ PageCode: 1, PageSize: 20, }) if err != nil { log.Fatal("search1", err) } log.Println(page) // 更新第一条记录为完成 row := page.Rows[0] if _, err = taskService.Finished(context.Background(), &pb.Task{ Id: row.Id, IsFinished: repository.Finished, }); err != nil { log.Fatal("finished", row.Id, err) } // 再次分页查询,校验修改结果 page, err = taskService.Search(context.Background(), &pb.SearchRequest{}) if err != nil { log.Fatal("search2", err) } log.Println(page) } func insertTask(taskService pb.TaskService, body string, start, end int64) { _, err := taskService.Create(context.Background(), &pb.Task{ // 这里先随便输入一个userId UserId: "10000", Body:body, StartTime: start, EndTime: end, }) if err != nil { log.Fatal("create", err) } log.Println("create task success! ") }

执行go run task-cli.go,日志与上次一致。这时候检查数据库已经出现了achievement表,并插入了一条记录。

四、消息处理机制

完成了整个消息处理后,我们回过头来再看看之前的代码,细心的读者可能会发现,在2.3 业务实现中我们自定义了一个消息处理结构体AchievementSub,他有一个方法Finished(ctx context.Context, task *pb.Task) error是我们用来处理消息的业务代码。

但在2.3注册服务,我们只是将一个新建的AchievementSub对象传入RegisterTaskServiceHandler()方法,并没有指定具体的业务处理方法Finished。

4.1 处理方法的参数返回值结构

如果AchievementSub定义了多个方法会怎么样呢:

... func (sub *AchievementSub) Finished(ctx context.Context, task *pb.Task) error { // 添加一个方法名输出 log.Println("Finished1") ... } // 这个方法保持和Finished方法一致的参数和返回值 func (sub *AchievementSub) Finished2(ctx context.Context, task *pb.Task) error { log.Println("Finished2") return nil } // 这个方法去掉了返回值 func (sub *AchievementSub) Finished3(ctx context.Context, task *pb.Task) { log.Println("Finished3") return nil } ...

重启achievement-srv,再次用task-cli.go触发服务,我们会在achievement-srv的日志中看到如下内容:

subscribe: subscriber AchievementSub.Finished3 has wrong number of outs: 0 require signature func(context.Context, interface{}) error

说明AchievementSub中定义的方法都必须保持统一的参数和返回值结构。

4.2 处理方法的调用顺序

给FInished3补上error返回值后再次尝试:

2020-09-18 10:06:10. I | Finished3 2020-09-18 10:06:10. I | Finished2 2020-09-18 10:06:10. I | Finished1

go-micro依次调用了AchievementSub的每一个处理方法,如果你多尝试几次,还会发现他们的调用顺序是乱序的。

4.3 处理方法的异常处理

如果Finishied2方法抛出异常,整个调用链会被打断吗?

... func (sub *AchievementSub) Finished2(ctx context.Context, task *pb.Task) error { log.Println("Finished2") return errors.New("break") } ...

再次重启服务并调用task-cli,很遗憾,调用并未被打断:

2020-09-18 10:09:54. I | Finished2 2020-09-18 10:09:54. I | Finished3 2020-09-18 10:09:54. I | Finished1

那么这些方法抛出的异常去哪里了呢?这里篇幅原因就不再做实验了,直接看源码。 熟悉消息队列的朋友应该能猜到,通过阅读go-micro/v2/server/handler.go的代码,订阅相关的配置项结构体SubscriberOptions有一个属性AutoAck:

type SubscriberOptions struct { // AutoAck defaults to true. When a handler returns // with a nil error the message is acked. AutoAckbool Queuestring Internal bool Contextcontext.Context }

官方注释已经写的很明白,他会在在处理handler返回error==nil时自动动应答(关于ack应答可以参阅nats等消息中间件的相关知识)。

4.4 直接指定处理方法

再回到achievement-srv/main.go文件,观察micro.RegisterSubscriber()方法,第三个参数handler其实是个interface{},他既可以传入一个对象,也可以直接传入方法(方法参数和返回值结构同上),当我们直接传入具体方法时,就会使用我们指定的方法处理业务:

... if err := micro.RegisterSubscriber("go.micro.service.task.finished", service.Server(), handler.Finished); err != nil { log.Fatal(errors.WithMessage(err, "subscribe")) } ... 2020-09-18 10:09:54. I | Finished1

总结

这一章,我们开发了achievement-srv服务,并通过消息订阅的方式实现了服务调用,做到了业务解耦。 请注意,这只是一个简化的使用演示,并不能很好体现出事件驱动的开发优势,而且如果仔细考虑业务逻辑,存在同一个任务反复调用完成接口刷成就的逻辑漏洞,整个项目仅供参考。 下一章,我们告别繁琐的task-cli.go方式调用,编写web api服务对外暴露http接口。