1. 生产者

运行Nsq服务集群

首先启动nsqlookud,在一个shell中,开始nsqlookupd:

$ nsqlookupd

在另一个shell中,开始nsqd:

$ nsqd —lookupd-tcp-address=127.0.0.1:4160

在另一个shell中,开始nsqadmin:

$ nsqadmin —lookupd-http-address=127.0.0.1:4161

发布初始消息(也在集群中创建主题):

$ curl -d 'hello world 1' 'http://127.0.0.1:4151/pub?topic=test'

最后,在另一个shell中,开始nsq_to_file:

$ nsq_to_file —topic=test —output-dir=/tmp —lookupd-http-address=127.0.0.1:4161

验证事物按预期工作,在Web浏览器中打开http://127.0.0.1:4171/ 以查看nsqadminUI并查看统计信息。另外,检查test.*.log写入的日志文件()的内容/tmp。

链接nsq 并创建生产者:

  1. package main
  2. import (
  3. "fmt"
  4. nsq "github.com/nsqio/go-nsq"
  5. )
  6. func main() {
  7. // 定义nsq生产者
  8. var producer *nsq.Producer
  9. // 初始化生产者
  10. // producer, err := nsq.NewProducer("地址:端口", nsq.*Config )
  11. producer, err := nsq.NewProducer("127.0.0.1:4150", nsq.NewConfig())
  12. if err != nil {
  13. panic(err)
  14. }
  15. err = producer.Ping()
  16. if nil != err {
  17. // 关闭生产者
  18. producer.Stop()
  19. producer = nil
  20. }
  21. fmt.Println("ping nsq success")
  22. }

生产者创建topic并写入nsq:

  1. package main
  2. import (
  3. "fmt"
  4. nsq "github.com/nsqio/go-nsq"
  5. )
  6. func main() {
  7. // 定义nsq生产者
  8. var producer *nsq.Producer
  9. // 初始化生产者
  10. // producer, err := nsq.NewProducer("地址:端口", nsq.*Config )
  11. producer, err := nsq.NewProducer("127.0.0.1:4150", nsq.NewConfig())
  12. if err != nil {
  13. panic(err)
  14. }
  15. err = producer.Ping()
  16. if nil != err {
  17. // 关闭生产者
  18. producer.Stop()
  19. producer = nil
  20. }
  21. // 生产者写入nsq,10条消息,topic = "test"
  22. topic := "test"
  23. for i := 0; i < 10; i++ {
  24. message := fmt.Sprintf("message:%d", i)
  25. if producer != nil && message != "" { //不能发布空串,否则会导致error
  26. err = producer.Publish(topic, []byte(message)) // 发布消息
  27. if err != nil {
  28. fmt.Printf("producer.Publish,err : %v", err)
  29. }
  30. fmt.Println(message)
  31. }
  32. }
  33. fmt.Println("producer.Publish success")
  34. }