NSQ简介

NSQ 是一个实时分布式音讯平台,旨在大规模运行,每天解决数十亿条音讯。

它提倡没有单点故障的分布式和分散式拓扑构造,实现容错和高可用性,同时保障牢靠的消息传递。请看特点和保障。

在操作上,NSQ很容易配置和部署(所有参数都在命令行上指定,编译的二进制文件没有运行时的依赖性)。为了取得最大的灵活性,它与数据格式无关(音讯能够是JSON、MsgPack、协定缓冲区或其余任何模式)。官网的Go和Python库是开箱即用的(还有许多其余的客户端库),如果你有趣味建设本人的库,有一个协定标准。

特点:

  • 谋求简略部署
  • 谋求高可用、防止单点故障、无核心设计
  • 确保音讯送达
  • 生产者消费者主动发现、消费者连贯所有生产者、向消费者推的模式
  • 提供 HTTP 接口
  • 提供简直所有编程语言的客户端开发包

NSQ是由出名短链接服务商bitly用Go语言开发的实时音讯解决零碎,具备高性能、高牢靠、忽视单点故障等长处,是一个十分不错的新兴的音讯队列解决方案。

源代码地址:https://github.com/nsqio/nsq

下载和装置nsq

官网装置地址:https://nsq.io/overview/quick...

官网下载地址:https://nsq.io/deployment/ins...

#创立目录,并进入mkdir -p /data/softwarecd /data/software#下载nsq包wget https://s3.amazonaws.com/bitly-downloads/nsq/nsq-0.3.8.linux-amd64.go1.6.2.tar.gz#解压tar -zxvf nsq-0.3.8.linux-amd64.go1.6.2.tar.gz#复制到指定目录cp -r /data/software/nsq-0.3.8.linux-amd64.go1.6.2 /data/beyond/nsq

启动时可配置文件的参考地址:
https://liushuchun.gitbooks.i...

启动相干服务

nohup /data/beyond/nsq/bin/nsqd &#(守护过程;接管,缓存和投递音讯给客户端)  如:nsqd -config=/home/nsq/bin/nsqd.cfgnohup /data/beyond/nsq/bin/nsqlookupd &#(守护过程;为消费者提供运行时发现服务,来查找指定话题(topic)的生产者 nsqd) nohup /data/beyond/nsq/bin/nsqadmin &#(提供 Web 页面用来实时的治理你的 NSQ 集群。它通过和 nsqlookupd 实例交换,来确定生产者)

指定端口启动
nohup /data/beyond/nsq/bin/nsqadmin --lookupd-http-address localhost:4160 &

指定配置文件启动
nohup /data/beyond/nsq/bin/nsqlookupd -config=/data/beyond/nsq/conf/nsqlookupd.cfg &

nohup /data/beyond/nsq/bin/nsqadmin -config=/data/beyond/nsq/conf/nsqadmin.cfg &

nohup /data/beyond/nsq/bin/nsqd -config=/data/beyond/nsq/conf/nsqd.cfg &

集群治理
记得谁说过,go调用nsq时,只能在代码里配置多个地址,去轮询调用

测试环境的配置,如:192.168.1.6

nsqd.cfg文件
cat nsqd.cfg#配置如下## enable verbose loggingverbose = false## unique identifier (int) for this worker (will default to a hash of hostname)id = 66## <addr>:<port> to listen on for TCP clientstcp_address = "0.0.0.0:4150"## <addr>:<port> to listen on for HTTP clientshttp_address = "0.0.0.0:4151"## <addr>:<port> to listen on for HTTPS clients# https_address = "0.0.0.0:4152"## address that will be registered with lookupd (defaults to the OS hostname)broadcast_address = "192.168.1.6"## cluster of nsqlookupd TCP addressesnsqlookupd_tcp_addresses = [    "192.168.1.6:4160"]## duration to wait before HTTP client connection timeouthttp_client_connect_timeout = "2s"## duration to wait before HTTP client request timeouthttp_client_request_timeout = "5s"## path to store disk-backed messages# data_path = "/var/lib/nsq"## number of messages to keep in memory (per topic/channel)mem_queue_size = 10000## number of bytes per diskqueue file before rollingmax_bytes_per_file = 104857600## number of messages per diskqueue fsyncsync_every = 2500## duration of time per diskqueue fsync (time.Duration)sync_timeout = "2s"## duration to wait before auto-requeing a messagemsg_timeout = "60s"## maximum duration before a message will timeoutmax_msg_timeout = "15m"## maximum size of a single message in bytesmax_msg_size = 1024768## maximum requeuing timeout for a messagemax_req_timeout = "1h"## maximum size of a single command bodymax_body_size = 5123840## maximum client configurable duration of time between client heartbeatsmax_heartbeat_interval = "60s"## maximum RDY count for a clientmax_rdy_count = 2500## maximum client configurable size (in bytes) for a client output buffermax_output_buffer_size = 65536## maximum client configurable duration of time between flushing to a client (time.Duration)max_output_buffer_timeout = "1s"## UDP <addr>:<port> of a statsd daemon for pushing stats# statsd_address = "127.0.0.1:8125"## prefix used for keys sent to statsd (%s for host replacement)statsd_prefix = "nsq.%s"## duration between pushing to statsd (time.Duration)statsd_interval = "60s"## toggle sending memory and GC stats to statsdstatsd_mem_stats = true## message processing time percentiles to keep track of (float)e2e_processing_latency_percentiles = [    100.0,    99.0,    95.0]## calculate end to end latency quantiles for this duration of time (time.Duration)e2e_processing_latency_window_time = "10m"## path to certificate filetls_cert = ""## path to private key filetls_key = ""## set policy on client certificate (require - client must provide certificate,##  require-verify - client must provide verifiable signed certificate)# tls_client_auth_policy = "require-verify"## set custom root Certificate Authority# tls_root_ca_file = ""## require client TLS upgradestls_required = false## minimum TLS version ("ssl3.0", "tls1.0," "tls1.1", "tls1.2")tls_min_version = ""## enable deflate feature negotiation (client compression)deflate = true## max deflate compression level a client can negotiate (> values == > nsqd CPU usage)max_deflate_level = 6## enable snappy feature negotiation (client compression)snappy = true
nsqlookupd.cfg文件
cat nsqlookupd.cfg#配置如下## enable verbose loggingverbose = false## <addr>:<port> to listen on for TCP clientstcp_address = "0.0.0.0:4160"## <addr>:<port> to listen on for HTTP clientshttp_address = "0.0.0.0:4161"## address that will be registered with lookupd (defaults to the OS hostname)broadcast_address = "192.168.1.6"## duration of time a producer will remain in the active list since its last pinginactive_producer_timeout = "300s"## duration of time a producer will remain tombstoned if registration remainstombstone_lifetime = "45s"
nsqadmin.cfg文件
cat nsqadmin.cfg#配置如下## <addr>:<port> to listen on for HTTP clientshttp_address = "0.0.0.0:4171"## graphite HTTP addressgraphite_url = ""## proxy HTTP requests to graphiteproxy_graphite = false## prefix used for keys sent to statsd (%s for host replacement, must match nsqd)statsd_prefix = "nsq.%s"## format of statsd counter statsstatsd_counter_format = "stats.counters.%s.count"## format of statsd gauge statsstatsd_gauge_format = "stats.gauges.%s"## time interval nsqd is configured to push to statsd (must match nsqd)statsd_interval = "60s"## HTTP endpoint (fully qualified) to which POST notifications of admin actions will be sentnotification_http_endpoint = ""## nsqlookupd HTTP addressesnsqlookupd_http_addresses = [    "192.168.1.6:4161"]## nsqd HTTP addresses (optional)#nsqd_http_addresses = [#    "192.168.1.6:4151",#    "192.168.1.7:4151",#]

测试环境的配置集群,第二台,如:192.168.1.7

留神: nsqlookupd_tcp_addresses 地址配置的是 192.168.1.6

nsqd.cfg文件
cat nsqd.cfg#配置如下## enable verbose loggingverbose = false## unique identifier (int) for this worker (will default to a hash of hostname)id = 77## <addr>:<port> to listen on for TCP clientstcp_address = "0.0.0.0:4150"## <addr>:<port> to listen on for HTTP clientshttp_address = "0.0.0.0:4151"## <addr>:<port> to listen on for HTTPS clients# https_address = "0.0.0.0:4152"## address that will be registered with lookupd (defaults to the OS hostname)broadcast_address = "192.168.1.7"## cluster of nsqlookupd TCP addressesnsqlookupd_tcp_addresses = [    "192.168.1.6:4160"]## duration to wait before HTTP client connection timeouthttp_client_connect_timeout = "2s"## duration to wait before HTTP client request timeouthttp_client_request_timeout = "5s"## path to store disk-backed messages# data_path = "/var/lib/nsq"## number of messages to keep in memory (per topic/channel)mem_queue_size = 10000## number of bytes per diskqueue file before rollingmax_bytes_per_file = 104857600## number of messages per diskqueue fsyncsync_every = 2500## duration of time per diskqueue fsync (time.Duration)sync_timeout = "2s"## duration to wait before auto-requeing a messagemsg_timeout = "60s"## maximum duration before a message will timeoutmax_msg_timeout = "15m"## maximum size of a single message in bytesmax_msg_size = 1024768## maximum requeuing timeout for a messagemax_req_timeout = "1h"## maximum size of a single command bodymax_body_size = 5123840## maximum client configurable duration of time between client heartbeatsmax_heartbeat_interval = "60s"## maximum RDY count for a clientmax_rdy_count = 2500## maximum client configurable size (in bytes) for a client output buffermax_output_buffer_size = 65536## maximum client configurable duration of time between flushing to a client (time.Duration)max_output_buffer_timeout = "1s"## UDP <addr>:<port> of a statsd daemon for pushing stats# statsd_address = "127.0.0.1:8125"## prefix used for keys sent to statsd (%s for host replacement)statsd_prefix = "nsq.%s"## duration between pushing to statsd (time.Duration)statsd_interval = "60s"## toggle sending memory and GC stats to statsdstatsd_mem_stats = true## message processing time percentiles to keep track of (float)e2e_processing_latency_percentiles = [    100.0,    99.0,    95.0]## calculate end to end latency quantiles for this duration of time (time.Duration)e2e_processing_latency_window_time = "10m"## path to certificate filetls_cert = ""## path to private key filetls_key = ""## set policy on client certificate (require - client must provide certificate,##  require-verify - client must provide verifiable signed certificate)# tls_client_auth_policy = "require-verify"## set custom root Certificate Authority# tls_root_ca_file = ""## require client TLS upgradestls_required = false## minimum TLS version ("ssl3.0", "tls1.0," "tls1.1", "tls1.2")tls_min_version = ""## enable deflate feature negotiation (client compression)deflate = true## max deflate compression level a client can negotiate (> values == > nsqd CPU usage)max_deflate_level = 6## enable snappy feature negotiation (client compression)snappy = true

其余环境的配置,如: 172.28.15.1

nsqd.cfg 文件
cat nsqd.cfg #配置如下## enable verbose loggingverbose = false## unique identifier (int) for this worker (will default to a hash of hostname)id = 66 ## <addr>:<port> to listen on for TCP clientstcp_address = "0.0.0.0:4150"## <addr>:<port> to listen on for HTTP clientshttp_address = "0.0.0.0:4151"## <addr>:<port> to listen on for HTTPS clients# https_address = "0.0.0.0:4152"## address that will be registered with lookupd (defaults to the OS hostname)broadcast_address = "172.28.15.1"## cluster of nsqlookupd TCP addressesnsqlookupd_tcp_addresses = [    "172.28.15.1:4160"]## duration to wait before HTTP client connection timeouthttp_client_connect_timeout = "2s"## duration to wait before HTTP client request timeouthttp_client_request_timeout = "5s"## path to store disk-backed messages# data_path = "/var/lib/nsq"## number of messages to keep in memory (per topic/channel)mem_queue_size = 10000## number of bytes per diskqueue file before rollingmax_bytes_per_file = 104857600## number of messages per diskqueue fsyncsync_every = 2500## duration of time per diskqueue fsync (time.Duration)sync_timeout = "2s"## duration to wait before auto-requeing a messagemsg_timeout = "60s"## maximum duration before a message will timeoutmax_msg_timeout = "15m"## maximum size of a single message in bytesmax_msg_size = 1024768## maximum requeuing timeout for a messagemax_req_timeout = "1h"## maximum size of a single command bodymax_body_size = 5123840## maximum client configurable duration of time between client heartbeatsmax_heartbeat_interval = "60s"## maximum RDY count for a clientmax_rdy_count = 2500## maximum client configurable size (in bytes) for a client output buffermax_output_buffer_size = 65536## maximum client configurable duration of time between flushing to a client (time.Duration)max_output_buffer_timeout = "1s"## UDP <addr>:<port> of a statsd daemon for pushing stats# statsd_address = "127.0.0.1:8125"## prefix used for keys sent to statsd (%s for host replacement)statsd_prefix = "nsq.%s"## duration between pushing to statsd (time.Duration)statsd_interval = "60s"## toggle sending memory and GC stats to statsdstatsd_mem_stats = true## message processing time percentiles to keep track of (float)e2e_processing_latency_percentiles = [    100.0,    99.0,    95.0]## calculate end to end latency quantiles for this duration of time (time.Duration)e2e_processing_latency_window_time = "10m"## path to certificate filetls_cert = ""## path to private key filetls_key = ""## set policy on client certificate (require - client must provide certificate,##  require-verify - client must provide verifiable signed certificate)# tls_client_auth_policy = "require-verify"## set custom root Certificate Authority# tls_root_ca_file = ""## require client TLS upgradestls_required = false## minimum TLS version ("ssl3.0", "tls1.0," "tls1.1", "tls1.2")tls_min_version = ""## enable deflate feature negotiation (client compression)deflate = true## max deflate compression level a client can negotiate (> values == > nsqd CPU usage)max_deflate_level = 6## enable snappy feature negotiation (client compression)snappy = true
nsqlookupd.cfg文件
cat nsqlookupd.cfg#配置如下## enable verbose loggingverbose = false## <addr>:<port> to listen on for TCP clientstcp_address = "0.0.0.0:4160"## <addr>:<port> to listen on for HTTP clientshttp_address = "0.0.0.0:4161"## address that will be registered with lookupd (defaults to the OS hostname)broadcast_address = "172.28.15.1"## duration of time a producer will remain in the active list since its last pinginactive_producer_timeout = "300s"## duration of time a producer will remain tombstoned if registration remainstombstone_lifetime = "45s"
nsqadmin.cfg文件
cat nsqadmin.cfg#配置如下## <addr>:<port> to listen on for HTTP clientshttp_address = "0.0.0.0:4171"## graphite HTTP addressgraphite_url = ""## proxy HTTP requests to graphiteproxy_graphite = false## prefix used for keys sent to statsd (%s for host replacement, must match nsqd)statsd_prefix = "nsq.%s"## format of statsd counter statsstatsd_counter_format = "stats.counters.%s.count"## format of statsd gauge statsstatsd_gauge_format = "stats.gauges.%s"## time interval nsqd is configured to push to statsd (must match nsqd)statsd_interval = "60s"## HTTP endpoint (fully qualified) to which POST notifications of admin actions will be sentnotification_http_endpoint = ""## nsqlookupd HTTP addressesnsqlookupd_http_addresses = [    "172.28.15.1:4161"]## nsqd HTTP addresses (optional)#nsqd_http_addresses = [#    "192.168.1.6:4151",#    "192.168.1.7:4151",#]