| 
                        副标题[/!--empirenews.page--]
                        技术沙龙 | 邀您于8月25日与国美/AWS/转转三位专家共同探讨小程序电商实战
               
在我前面的博客文章 “我的第一个 Go 微服务:使用 MongoDB 和 Docker 多阶段构建” 中,我创建了一个 Go 微服务示例,它发布一个 REST 式的 http 端点,并将从 HTTP POST 中接收到的数据保存到 MongoDB 数据库。 
在这个示例中,我将数据的保存和 MongoDB 分离,并创建另一个微服务去处理它。我还添加了 Kafka 为消息层服务,这样微服务就可以异步处理它自己关心的东西了。 
如果你有时间去看,我将这个博客文章的整个过程录制到 这个视频中了 :) 
 
下面是这个使用了两个微服务的简单的异步处理示例的上层架构图。 
  
rest-kafka-mongo-microservice-draw-io 
微服务 1 —— 是一个 REST 式微服务,它从一个 /POST http 调用中接收数据。接收到请求之后,它从 http 请求中检索数据,并将它保存到 Kafka。保存之后,它通过 /POST 发送相同的数据去响应调用者。 
微服务 2 —— 是一个订阅了 Kafka 中的一个主题的微服务,微服务 1 的数据保存在该主题。一旦消息被微服务消费之后,它接着保存数据到 MongoDB 中。 
在你继续之前,我们需要能够去运行这些微服务的几件东西: 
    - 下载 Kafka —— 我使用的版本是 kafka_2.11-1.1.0
 
    - 安装 librdkafka —— 不幸的是,这个库应该在目标系统中
 
    - 安装 Kafka Go 客户端
 
    - 运行 MongoDB。你可以去看我的 以前的文章 中关于这一块的内容,那篇文章中我使用了一个 MongoDB docker 镜像。
 
 
我们开始吧! 
首先,启动 Kafka,在你运行 Kafka 服务器之前,你需要运行 Zookeeper。下面是示例: 
$ cd /<download path>/kafka_2.11-1.1.0$ bin/zookeeper-server-start.sh config/zookeeper.properties
  
接着运行 Kafka —— 我使用 9092 端口连接到 Kafka。如果你需要改变端口,只需要在 config/server.properties 中配置即可。如果你像我一样是个新手,我建议你现在还是使用默认端口。 
$ bin/kafka-server-start.sh config/server.properties
  
Kafka 跑起来之后,我们需要 MongoDB。它很简单,只需要使用这个 docker-compose.yml 即可。 
version: '3'services:  mongodb:    image: mongo    ports:      - "27017:27017"    volumes:      - "mongodata:/data/db"    networks:      - network1-  
 volumes:   mongodata:-  
 networks:   network1:
  
使用 Docker Compose 去运行 MongoDB docker 容器。 
docker-compose up
  
这里是微服务 1 的相关代码。我只是修改了我前面的示例去保存到 Kafka 而不是 MongoDB: 
rest-to-kafka/rest-kafka-sample.go 
func jobsPostHandler(w http.ResponseWriter, r *http.Request) {-  
     //Retrieve body from http request    b, err := ioutil.ReadAll(r.Body)    defer r.Body.Close()    if err != nil {        panic(err)    }-  
     //Save data into Job struct    var _job Job    err = json.Unmarshal(b, &_job)    if err != nil {        http.Error(w, err.Error(), 500)        return    }-  
     saveJobToKafka(_job)-  
     //Convert job struct into json    jsonString, err := json.Marshal(_job)    if err != nil {        http.Error(w, err.Error(), 500)        return    }-  
     //Set content-type http header    w.Header().Set("content-type", "application/json")-  
     //Send back data as response    w.Write(jsonString)-  
 }-  
 func saveJobToKafka(job Job) {-  
     fmt.Println("save to kafka")-  
     jsonString, err := json.Marshal(job)-  
     jobString := string(jsonString)    fmt.Print(jobString)-  
     p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})    if err != nil {        panic(err)    }-  
     // Produce messages to topic (asynchronously)    topic := "jobs-topic1"    for _, word := range []string{string(jobString)} {        p.Produce(&kafka.Message{            TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},            Value:          []byte(word),        }, nil)    }}
  
这里是微服务 2 的代码。在这个代码中最重要的东西是从 Kafka 中消费数据,保存部分我已经在前面的博客文章中讨论过了。这里代码的重点部分是从 Kafka 中消费数据: 
                                                (编辑:91站长网) 
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! 
                     |