| 
                         kafka-to-mongo/kafka-mongo-sample.go 
func main() {-  
     //Create MongoDB session    session := initialiseMongo()    mongoStore.session = session-  
     receiveFromKafka()-  
 }-  
 func receiveFromKafka() {-  
     fmt.Println("Start receiving from Kafka")    c, err := kafka.NewConsumer(&kafka.ConfigMap{        "bootstrap.servers": "localhost:9092",        "group.id":          "group-id-1",        "auto.offset.reset": "earliest",    })-  
     if err != nil {        panic(err)    }-  
     c.SubscribeTopics([]string{"jobs-topic1"}, nil)-  
     for {        msg, err := c.ReadMessage(-1)-  
         if err == nil {            fmt.Printf("Received from Kafka %s: %sn", msg.TopicPartition, string(msg.Value))            job := string(msg.Value)            saveJobToMongo(job)        } else {            fmt.Printf("Consumer error: %v (%v)n", err, msg)            break        }    }-  
     c.Close()-  
 }-  
 func saveJobToMongo(jobString string) {-  
     fmt.Println("Save to MongoDB")    col := mongoStore.session.DB(database).C(collection)-  
     //Save data into Job struct    var _job Job    b := []byte(jobString)    err := json.Unmarshal(b, &_job)    if err != nil {        panic(err)    }-  
     //Insert job into MongoDB    errMongo := col.Insert(_job)    if errMongo != nil {        panic(errMongo)    }-  
     fmt.Printf("Saved to MongoDB : %s", jobString)-  
 }
                          (编辑:91站长网) 
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! 
                     |