共计 12243 个字符,预计需要花费 31 分钟才能阅读完成。
NSQ 简介
NSQ 是一个实时分布式音讯平台,旨在大规模运行,每天解决数十亿条音讯。
它提倡没有单点故障的分布式和分散式拓扑构造,实现容错和高可用性,同时保障牢靠的消息传递。请看特点和保障。
在操作上,NSQ 很容易配置和部署(所有参数都在命令行上指定,编译的二进制文件没有运行时的依赖性)。为了取得最大的灵活性,它与数据格式无关(音讯能够是 JSON、MsgPack、协定缓冲区或其余任何模式)。官网的 Go 和 Python 库是开箱即用的(还有许多其余的客户端库),如果你有趣味建设本人的库,有一个协定标准。
特点:
- 谋求简略部署
- 谋求高可用、防止单点故障、无核心设计
- 确保音讯送达
- 生产者消费者主动发现、消费者连贯所有生产者、向消费者推的模式
- 提供 HTTP 接口
- 提供简直所有编程语言的客户端开发包
NSQ 是由出名短链接服务商 bitly 用 Go 语言开发的实时音讯解决零碎,具备高性能、高牢靠、忽视单点故障等长处,是一个十分不错的新兴的音讯队列解决方案。
源代码地址:https://github.com/nsqio/nsq
下载和装置 nsq
官网装置地址:https://nsq.io/overview/quick…
官网下载地址:https://nsq.io/deployment/ins…
# 创立目录, 并进入 | |
mkdir -p /data/software | |
cd /data/software | |
#下载 nsq 包 | |
wget https://s3.amazonaws.com/bitly-downloads/nsq/nsq-0.3.8.linux-amd64.go1.6.2.tar.gz | |
#解压 | |
tar -zxvf nsq-0.3.8.linux-amd64.go1.6.2.tar.gz | |
#复制到指定目录 | |
cp -r /data/software/nsq-0.3.8.linux-amd64.go1.6.2 /data/beyond/nsq |
启动时可配置文件的参考地址:
https://liushuchun.gitbooks.i…
启动相干服务
nohup /data/beyond/nsq/bin/nsqd & | |
#(守护过程;接管,缓存和投递音讯给客户端)如:nsqd -config=/home/nsq/bin/nsqd.cfg | |
nohup /data/beyond/nsq/bin/nsqlookupd & | |
#(守护过程;为消费者提供运行时发现服务,来查找指定话题(topic)的生产者 nsqd)nohup /data/beyond/nsq/bin/nsqadmin & | |
#(提供 Web 页面用来实时的治理你的 NSQ 集群。它通过和 nsqlookupd 实例交换,来确定生产者) |
指定端口启动
nohup /data/beyond/nsq/bin/nsqadmin –lookupd-http-address localhost:4160 &
指定配置文件启动
nohup /data/beyond/nsq/bin/nsqlookupd -config=/data/beyond/nsq/conf/nsqlookupd.cfg &
nohup /data/beyond/nsq/bin/nsqadmin -config=/data/beyond/nsq/conf/nsqadmin.cfg &
nohup /data/beyond/nsq/bin/nsqd -config=/data/beyond/nsq/conf/nsqd.cfg &
集群治理
记得谁说过,go 调用 nsq 时,只能在代码里配置多个地址,去轮询调用
测试环境的配置, 如:192.168.1.6
nsqd.cfg 文件
cat nsqd.cfg | |
#配置如下 | |
## enable verbose logging | |
verbose = false | |
## unique identifier (int) for this worker (will default to a hash of hostname) | |
id = 66 | |
## <addr>:<port> to listen on for TCP clients | |
tcp_address = "0.0.0.0:4150" | |
## <addr>:<port> to listen on for HTTP clients | |
http_address = "0.0.0.0:4151" | |
## <addr>:<port> to listen on for HTTPS clients | |
# https_address = "0.0.0.0:4152" | |
## address that will be registered with lookupd (defaults to the OS hostname) | |
broadcast_address = "192.168.1.6" | |
## cluster of nsqlookupd TCP addresses | |
nsqlookupd_tcp_addresses = ["192.168.1.6:4160"] | |
## duration to wait before HTTP client connection timeout | |
http_client_connect_timeout = "2s" | |
## duration to wait before HTTP client request timeout | |
http_client_request_timeout = "5s" | |
## path to store disk-backed messages | |
# data_path = "/var/lib/nsq" | |
## number of messages to keep in memory (per topic/channel) | |
mem_queue_size = 10000 | |
## number of bytes per diskqueue file before rolling | |
max_bytes_per_file = 104857600 | |
## number of messages per diskqueue fsync | |
sync_every = 2500 | |
## duration of time per diskqueue fsync (time.Duration) | |
sync_timeout = "2s" | |
## duration to wait before auto-requeing a message | |
msg_timeout = "60s" | |
## maximum duration before a message will timeout | |
max_msg_timeout = "15m" | |
## maximum size of a single message in bytes | |
max_msg_size = 1024768 | |
## maximum requeuing timeout for a message | |
max_req_timeout = "1h" | |
## maximum size of a single command body | |
max_body_size = 5123840 | |
## maximum client configurable duration of time between client heartbeats | |
max_heartbeat_interval = "60s" | |
## maximum RDY count for a client | |
max_rdy_count = 2500 | |
## maximum client configurable size (in bytes) for a client output buffer | |
max_output_buffer_size = 65536 | |
## maximum client configurable duration of time between flushing to a client (time.Duration) | |
max_output_buffer_timeout = "1s" | |
## UDP <addr>:<port> of a statsd daemon for pushing stats | |
# statsd_address = "127.0.0.1:8125" | |
## prefix used for keys sent to statsd (%s for host replacement) | |
statsd_prefix = "nsq.%s" | |
## duration between pushing to statsd (time.Duration) | |
statsd_interval = "60s" | |
## toggle sending memory and GC stats to statsd | |
statsd_mem_stats = true | |
## message processing time percentiles to keep track of (float) | |
e2e_processing_latency_percentiles = [ | |
100.0, | |
99.0, | |
95.0 | |
] | |
## calculate end to end latency quantiles for this duration of time (time.Duration) | |
e2e_processing_latency_window_time = "10m" | |
## path to certificate file | |
tls_cert = "" | |
## path to private key file | |
tls_key = "" | |
## set policy on client certificate (require - client must provide certificate, | |
## require-verify - client must provide verifiable signed certificate) | |
# tls_client_auth_policy = "require-verify" | |
## set custom root Certificate Authority | |
# tls_root_ca_file = "" | |
## require client TLS upgrades | |
tls_required = false | |
## minimum TLS version ("ssl3.0", "tls1.0," "tls1.1", "tls1.2") | |
tls_min_version = "" | |
## enable deflate feature negotiation (client compression) | |
deflate = true | |
## max deflate compression level a client can negotiate (> values == > nsqd CPU usage) | |
max_deflate_level = 6 | |
## enable snappy feature negotiation (client compression) | |
snappy = true |
nsqlookupd.cfg 文件
cat nsqlookupd.cfg | |
#配置如下 | |
## enable verbose logging | |
verbose = false | |
## <addr>:<port> to listen on for TCP clients | |
tcp_address = "0.0.0.0:4160" | |
## <addr>:<port> to listen on for HTTP clients | |
http_address = "0.0.0.0:4161" | |
## address that will be registered with lookupd (defaults to the OS hostname) | |
broadcast_address = "192.168.1.6" | |
## duration of time a producer will remain in the active list since its last ping | |
inactive_producer_timeout = "300s" | |
## duration of time a producer will remain tombstoned if registration remains | |
tombstone_lifetime = "45s" |
nsqadmin.cfg 文件
cat nsqadmin.cfg | |
#配置如下 | |
## <addr>:<port> to listen on for HTTP clients | |
http_address = "0.0.0.0:4171" | |
## graphite HTTP address | |
graphite_url = "" | |
## proxy HTTP requests to graphite | |
proxy_graphite = false | |
## prefix used for keys sent to statsd (%s for host replacement, must match nsqd) | |
statsd_prefix = "nsq.%s" | |
## format of statsd counter stats | |
statsd_counter_format = "stats.counters.%s.count" | |
## format of statsd gauge stats | |
statsd_gauge_format = "stats.gauges.%s" | |
## time interval nsqd is configured to push to statsd (must match nsqd) | |
statsd_interval = "60s" | |
## HTTP endpoint (fully qualified) to which POST notifications of admin actions will be sent | |
notification_http_endpoint = "" | |
## nsqlookupd HTTP addresses | |
nsqlookupd_http_addresses = ["192.168.1.6:4161"] | |
## nsqd HTTP addresses (optional) | |
#nsqd_http_addresses = [ | |
# "192.168.1.6:4151", | |
# "192.168.1.7:4151", | |
#] |
测试环境的配置集群, 第二台, 如:192.168.1.7
留神: nsqlookupd_tcp_addresses 地址配置的是 192.168.1.6
nsqd.cfg 文件
cat nsqd.cfg | |
#配置如下 | |
## enable verbose logging | |
verbose = false | |
## unique identifier (int) for this worker (will default to a hash of hostname) | |
id = 77 | |
## <addr>:<port> to listen on for TCP clients | |
tcp_address = "0.0.0.0:4150" | |
## <addr>:<port> to listen on for HTTP clients | |
http_address = "0.0.0.0:4151" | |
## <addr>:<port> to listen on for HTTPS clients | |
# https_address = "0.0.0.0:4152" | |
## address that will be registered with lookupd (defaults to the OS hostname) | |
broadcast_address = "192.168.1.7" | |
## cluster of nsqlookupd TCP addresses | |
nsqlookupd_tcp_addresses = ["192.168.1.6:4160"] | |
## duration to wait before HTTP client connection timeout | |
http_client_connect_timeout = "2s" | |
## duration to wait before HTTP client request timeout | |
http_client_request_timeout = "5s" | |
## path to store disk-backed messages | |
# data_path = "/var/lib/nsq" | |
## number of messages to keep in memory (per topic/channel) | |
mem_queue_size = 10000 | |
## number of bytes per diskqueue file before rolling | |
max_bytes_per_file = 104857600 | |
## number of messages per diskqueue fsync | |
sync_every = 2500 | |
## duration of time per diskqueue fsync (time.Duration) | |
sync_timeout = "2s" | |
## duration to wait before auto-requeing a message | |
msg_timeout = "60s" | |
## maximum duration before a message will timeout | |
max_msg_timeout = "15m" | |
## maximum size of a single message in bytes | |
max_msg_size = 1024768 | |
## maximum requeuing timeout for a message | |
max_req_timeout = "1h" | |
## maximum size of a single command body | |
max_body_size = 5123840 | |
## maximum client configurable duration of time between client heartbeats | |
max_heartbeat_interval = "60s" | |
## maximum RDY count for a client | |
max_rdy_count = 2500 | |
## maximum client configurable size (in bytes) for a client output buffer | |
max_output_buffer_size = 65536 | |
## maximum client configurable duration of time between flushing to a client (time.Duration) | |
max_output_buffer_timeout = "1s" | |
## UDP <addr>:<port> of a statsd daemon for pushing stats | |
# statsd_address = "127.0.0.1:8125" | |
## prefix used for keys sent to statsd (%s for host replacement) | |
statsd_prefix = "nsq.%s" | |
## duration between pushing to statsd (time.Duration) | |
statsd_interval = "60s" | |
## toggle sending memory and GC stats to statsd | |
statsd_mem_stats = true | |
## message processing time percentiles to keep track of (float) | |
e2e_processing_latency_percentiles = [ | |
100.0, | |
99.0, | |
95.0 | |
] | |
## calculate end to end latency quantiles for this duration of time (time.Duration) | |
e2e_processing_latency_window_time = "10m" | |
## path to certificate file | |
tls_cert = "" | |
## path to private key file | |
tls_key = "" | |
## set policy on client certificate (require - client must provide certificate, | |
## require-verify - client must provide verifiable signed certificate) | |
# tls_client_auth_policy = "require-verify" | |
## set custom root Certificate Authority | |
# tls_root_ca_file = "" | |
## require client TLS upgrades | |
tls_required = false | |
## minimum TLS version ("ssl3.0", "tls1.0," "tls1.1", "tls1.2") | |
tls_min_version = "" | |
## enable deflate feature negotiation (client compression) | |
deflate = true | |
## max deflate compression level a client can negotiate (> values == > nsqd CPU usage) | |
max_deflate_level = 6 | |
## enable snappy feature negotiation (client compression) | |
snappy = true |
其余环境的配置, 如: 172.28.15.1
nsqd.cfg 文件
cat nsqd.cfg | |
#配置如下 | |
## enable verbose logging | |
verbose = false | |
## unique identifier (int) for this worker (will default to a hash of hostname) | |
id = 66 | |
## <addr>:<port> to listen on for TCP clients | |
tcp_address = "0.0.0.0:4150" | |
## <addr>:<port> to listen on for HTTP clients | |
http_address = "0.0.0.0:4151" | |
## <addr>:<port> to listen on for HTTPS clients | |
# https_address = "0.0.0.0:4152" | |
## address that will be registered with lookupd (defaults to the OS hostname) | |
broadcast_address = "172.28.15.1" | |
## cluster of nsqlookupd TCP addresses | |
nsqlookupd_tcp_addresses = ["172.28.15.1:4160"] | |
## duration to wait before HTTP client connection timeout | |
http_client_connect_timeout = "2s" | |
## duration to wait before HTTP client request timeout | |
http_client_request_timeout = "5s" | |
## path to store disk-backed messages | |
# data_path = "/var/lib/nsq" | |
## number of messages to keep in memory (per topic/channel) | |
mem_queue_size = 10000 | |
## number of bytes per diskqueue file before rolling | |
max_bytes_per_file = 104857600 | |
## number of messages per diskqueue fsync | |
sync_every = 2500 | |
## duration of time per diskqueue fsync (time.Duration) | |
sync_timeout = "2s" | |
## duration to wait before auto-requeing a message | |
msg_timeout = "60s" | |
## maximum duration before a message will timeout | |
max_msg_timeout = "15m" | |
## maximum size of a single message in bytes | |
max_msg_size = 1024768 | |
## maximum requeuing timeout for a message | |
max_req_timeout = "1h" | |
## maximum size of a single command body | |
max_body_size = 5123840 | |
## maximum client configurable duration of time between client heartbeats | |
max_heartbeat_interval = "60s" | |
## maximum RDY count for a client | |
max_rdy_count = 2500 | |
## maximum client configurable size (in bytes) for a client output buffer | |
max_output_buffer_size = 65536 | |
## maximum client configurable duration of time between flushing to a client (time.Duration) | |
max_output_buffer_timeout = "1s" | |
## UDP <addr>:<port> of a statsd daemon for pushing stats | |
# statsd_address = "127.0.0.1:8125" | |
## prefix used for keys sent to statsd (%s for host replacement) | |
statsd_prefix = "nsq.%s" | |
## duration between pushing to statsd (time.Duration) | |
statsd_interval = "60s" | |
## toggle sending memory and GC stats to statsd | |
statsd_mem_stats = true | |
## message processing time percentiles to keep track of (float) | |
e2e_processing_latency_percentiles = [ | |
100.0, | |
99.0, | |
95.0 | |
] | |
## calculate end to end latency quantiles for this duration of time (time.Duration) | |
e2e_processing_latency_window_time = "10m" | |
## path to certificate file | |
tls_cert = "" | |
## path to private key file | |
tls_key = "" | |
## set policy on client certificate (require - client must provide certificate, | |
## require-verify - client must provide verifiable signed certificate) | |
# tls_client_auth_policy = "require-verify" | |
## set custom root Certificate Authority | |
# tls_root_ca_file = "" | |
## require client TLS upgrades | |
tls_required = false | |
## minimum TLS version ("ssl3.0", "tls1.0," "tls1.1", "tls1.2") | |
tls_min_version = "" | |
## enable deflate feature negotiation (client compression) | |
deflate = true | |
## max deflate compression level a client can negotiate (> values == > nsqd CPU usage) | |
max_deflate_level = 6 | |
## enable snappy feature negotiation (client compression) | |
snappy = true |
nsqlookupd.cfg 文件
cat nsqlookupd.cfg | |
#配置如下 | |
## enable verbose logging | |
verbose = false | |
## <addr>:<port> to listen on for TCP clients | |
tcp_address = "0.0.0.0:4160" | |
## <addr>:<port> to listen on for HTTP clients | |
http_address = "0.0.0.0:4161" | |
## address that will be registered with lookupd (defaults to the OS hostname) | |
broadcast_address = "172.28.15.1" | |
## duration of time a producer will remain in the active list since its last ping | |
inactive_producer_timeout = "300s" | |
## duration of time a producer will remain tombstoned if registration remains | |
tombstone_lifetime = "45s" |
nsqadmin.cfg 文件
cat nsqadmin.cfg | |
#配置如下 | |
## <addr>:<port> to listen on for HTTP clients | |
http_address = "0.0.0.0:4171" | |
## graphite HTTP address | |
graphite_url = "" | |
## proxy HTTP requests to graphite | |
proxy_graphite = false | |
## prefix used for keys sent to statsd (%s for host replacement, must match nsqd) | |
statsd_prefix = "nsq.%s" | |
## format of statsd counter stats | |
statsd_counter_format = "stats.counters.%s.count" | |
## format of statsd gauge stats | |
statsd_gauge_format = "stats.gauges.%s" | |
## time interval nsqd is configured to push to statsd (must match nsqd) | |
statsd_interval = "60s" | |
## HTTP endpoint (fully qualified) to which POST notifications of admin actions will be sent | |
notification_http_endpoint = "" | |
## nsqlookupd HTTP addresses | |
nsqlookupd_http_addresses = ["172.28.15.1:4161"] | |
## nsqd HTTP addresses (optional) | |
#nsqd_http_addresses = [ | |
# "192.168.1.6:4151", | |
# "192.168.1.7:4151", | |
#] |