add more nats debug
This commit is contained in:
parent
36e9065474
commit
3ff004afa0
@ -69,6 +69,8 @@ type Agent struct {
|
|||||||
NatsServer string
|
NatsServer string
|
||||||
NatsProxyPath string
|
NatsProxyPath string
|
||||||
NatsProxyPort string
|
NatsProxyPort string
|
||||||
|
NatsPingInterval int
|
||||||
|
NatsWSCompression bool
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -171,10 +173,19 @@ func New(logger *logrus.Logger, version string) *Agent {
|
|||||||
|
|
||||||
// check if using nats standard tcp, otherwise use nats websockets by default
|
// check if using nats standard tcp, otherwise use nats websockets by default
|
||||||
var natsServer string
|
var natsServer string
|
||||||
|
var natsWsCompression bool
|
||||||
if ac.NatsStandardPort != "" {
|
if ac.NatsStandardPort != "" {
|
||||||
natsServer = fmt.Sprintf("tls://%s:%s", ac.APIURL, ac.NatsStandardPort)
|
natsServer = fmt.Sprintf("tls://%s:%s", ac.APIURL, ac.NatsStandardPort)
|
||||||
} else {
|
} else {
|
||||||
natsServer = fmt.Sprintf("wss://%s:%s", ac.APIURL, natsProxyPort)
|
natsServer = fmt.Sprintf("wss://%s:%s", ac.APIURL, natsProxyPort)
|
||||||
|
natsWsCompression = true
|
||||||
|
}
|
||||||
|
|
||||||
|
var natsPingInterval int
|
||||||
|
if ac.NatsPingInterval == 0 {
|
||||||
|
natsPingInterval = randRange(35, 45)
|
||||||
|
} else {
|
||||||
|
natsPingInterval = ac.NatsPingInterval
|
||||||
}
|
}
|
||||||
|
|
||||||
return &Agent{
|
return &Agent{
|
||||||
@ -204,6 +215,8 @@ func New(logger *logrus.Logger, version string) *Agent {
|
|||||||
NatsServer: natsServer,
|
NatsServer: natsServer,
|
||||||
NatsProxyPath: natsProxyPath,
|
NatsProxyPath: natsProxyPath,
|
||||||
NatsProxyPort: natsProxyPort,
|
NatsProxyPort: natsProxyPort,
|
||||||
|
NatsPingInterval: natsPingInterval,
|
||||||
|
NatsWSCompression: natsWsCompression,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -399,14 +412,18 @@ func (a *Agent) SyncMeshNodeID() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (a *Agent) setupNatsOptions() []nats.Option {
|
func (a *Agent) setupNatsOptions() []nats.Option {
|
||||||
|
reconnectWait := randRange(2, 8)
|
||||||
opts := make([]nats.Option, 0)
|
opts := make([]nats.Option, 0)
|
||||||
opts = append(opts, nats.Name(a.AgentID))
|
opts = append(opts, nats.Name(a.AgentID))
|
||||||
opts = append(opts, nats.UserInfo(a.AgentID, a.Token))
|
opts = append(opts, nats.UserInfo(a.AgentID, a.Token))
|
||||||
opts = append(opts, nats.ReconnectWait(time.Second*5))
|
opts = append(opts, nats.ReconnectWait(time.Duration(reconnectWait)*time.Second))
|
||||||
opts = append(opts, nats.RetryOnFailedConnect(true))
|
opts = append(opts, nats.RetryOnFailedConnect(true))
|
||||||
|
opts = append(opts, nats.PingInterval(time.Duration(a.NatsPingInterval)*time.Second))
|
||||||
|
opts = append(opts, nats.Compression(a.NatsWSCompression))
|
||||||
opts = append(opts, nats.MaxReconnects(-1))
|
opts = append(opts, nats.MaxReconnects(-1))
|
||||||
opts = append(opts, nats.ReconnectBufSize(-1))
|
opts = append(opts, nats.ReconnectBufSize(-1))
|
||||||
opts = append(opts, nats.ProxyPath(a.NatsProxyPath))
|
opts = append(opts, nats.ProxyPath(a.NatsProxyPath))
|
||||||
|
opts = append(opts, nats.ReconnectJitter(500*time.Millisecond, 4*time.Second))
|
||||||
opts = append(opts, nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
|
opts = append(opts, nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
|
||||||
a.Logger.Errorln("NATS disconnected:", err)
|
a.Logger.Errorln("NATS disconnected:", err)
|
||||||
a.Logger.Errorf("%+v\n", nc.Statistics)
|
a.Logger.Errorf("%+v\n", nc.Statistics)
|
||||||
|
@ -158,6 +158,7 @@ func NewAgentConfig() *rmm.AgentConfig {
|
|||||||
NatsProxyPath: viper.GetString("natsproxypath"),
|
NatsProxyPath: viper.GetString("natsproxypath"),
|
||||||
NatsProxyPort: viper.GetString("natsproxyport"),
|
NatsProxyPort: viper.GetString("natsproxyport"),
|
||||||
NatsStandardPort: viper.GetString("natsstandardport"),
|
NatsStandardPort: viper.GetString("natsstandardport"),
|
||||||
|
NatsPingInterval: viper.GetInt("natspinginterval"),
|
||||||
}
|
}
|
||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
|
@ -65,6 +65,8 @@ func NewAgentConfig() *rmm.AgentConfig {
|
|||||||
natsProxyPath, _, _ := k.GetStringValue("NatsProxyPath")
|
natsProxyPath, _, _ := k.GetStringValue("NatsProxyPath")
|
||||||
natsProxyPort, _, _ := k.GetStringValue("NatsProxyPort")
|
natsProxyPort, _, _ := k.GetStringValue("NatsProxyPort")
|
||||||
natsStandardPort, _, _ := k.GetStringValue("NatsStandardPort")
|
natsStandardPort, _, _ := k.GetStringValue("NatsStandardPort")
|
||||||
|
natsPingInterval, _, _ := k.GetStringValue("NatsPingInterval")
|
||||||
|
npi, _ := strconv.Atoi(natsPingInterval)
|
||||||
|
|
||||||
return &rmm.AgentConfig{
|
return &rmm.AgentConfig{
|
||||||
BaseURL: baseurl,
|
BaseURL: baseurl,
|
||||||
@ -79,6 +81,7 @@ func NewAgentConfig() *rmm.AgentConfig {
|
|||||||
NatsProxyPath: natsProxyPath,
|
NatsProxyPath: natsProxyPath,
|
||||||
NatsProxyPort: natsProxyPort,
|
NatsProxyPort: natsProxyPort,
|
||||||
NatsStandardPort: natsStandardPort,
|
NatsStandardPort: natsStandardPort,
|
||||||
|
NatsPingInterval: npi,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -55,6 +55,8 @@ func (a *Agent) RunRPC() {
|
|||||||
|
|
||||||
opts := a.setupNatsOptions()
|
opts := a.setupNatsOptions()
|
||||||
nc, err := nats.Connect(a.NatsServer, opts...)
|
nc, err := nats.Connect(a.NatsServer, opts...)
|
||||||
|
a.Logger.Debugf("%+v\n", nc)
|
||||||
|
a.Logger.Debugf("%+v\n", nc.Opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
a.Logger.Fatalln("RunRPC() nats.Connect()", err)
|
a.Logger.Fatalln("RunRPC() nats.Connect()", err)
|
||||||
}
|
}
|
||||||
|
@ -45,6 +45,7 @@ type AgentConfig struct {
|
|||||||
NatsProxyPath string
|
NatsProxyPath string
|
||||||
NatsProxyPort string
|
NatsProxyPort string
|
||||||
NatsStandardPort string
|
NatsStandardPort string
|
||||||
|
NatsPingInterval int
|
||||||
}
|
}
|
||||||
|
|
||||||
type RunScriptResp struct {
|
type RunScriptResp struct {
|
||||||
|
Loading…
Reference in New Issue
Block a user