diff --git a/agent/agent.go b/agent/agent.go index 45c43b0..a5d36fb 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -40,35 +40,37 @@ import ( // Agent struct type Agent struct { - Hostname string - Arch string - AgentID string - BaseURL string - ApiURL string - Token string - AgentPK int - Cert string - ProgramDir string - EXE string - SystemDrive string - MeshInstaller string - MeshSystemEXE string - MeshSVC string - PyBin string - Headers map[string]string - Logger *logrus.Logger - Version string - Debug bool - rClient *resty.Client - Proxy string - LogTo string - LogFile *os.File - Platform string - GoArch string - ServiceConfig *service.Config - NatsServer string - NatsProxyPath string - NatsProxyPort string + Hostname string + Arch string + AgentID string + BaseURL string + ApiURL string + Token string + AgentPK int + Cert string + ProgramDir string + EXE string + SystemDrive string + MeshInstaller string + MeshSystemEXE string + MeshSVC string + PyBin string + Headers map[string]string + Logger *logrus.Logger + Version string + Debug bool + rClient *resty.Client + Proxy string + LogTo string + LogFile *os.File + Platform string + GoArch string + ServiceConfig *service.Config + NatsServer string + NatsProxyPath string + NatsProxyPort string + NatsPingInterval int + NatsWSCompression bool } const ( @@ -171,39 +173,50 @@ func New(logger *logrus.Logger, version string) *Agent { // check if using nats standard tcp, otherwise use nats websockets by default var natsServer string + var natsWsCompression bool if ac.NatsStandardPort != "" { natsServer = fmt.Sprintf("tls://%s:%s", ac.APIURL, ac.NatsStandardPort) } else { natsServer = fmt.Sprintf("wss://%s:%s", ac.APIURL, natsProxyPort) + natsWsCompression = true + } + + var natsPingInterval int + if ac.NatsPingInterval == 0 { + natsPingInterval = randRange(35, 45) + } else { + natsPingInterval = ac.NatsPingInterval } return &Agent{ - Hostname: info.Hostname, - BaseURL: ac.BaseURL, - AgentID: ac.AgentID, - ApiURL: ac.APIURL, - Token: ac.Token, - AgentPK: ac.PK, - Cert: ac.Cert, - ProgramDir: pd, - EXE: exe, - SystemDrive: sd, - MeshInstaller: "meshagent.exe", - MeshSystemEXE: MeshSysExe, - MeshSVC: meshSvcName, - PyBin: pybin, - Headers: headers, - Logger: logger, - Version: version, - Debug: logger.IsLevelEnabled(logrus.DebugLevel), - rClient: restyC, - Proxy: ac.Proxy, - Platform: runtime.GOOS, - GoArch: runtime.GOARCH, - ServiceConfig: svcConf, - NatsServer: natsServer, - NatsProxyPath: natsProxyPath, - NatsProxyPort: natsProxyPort, + Hostname: info.Hostname, + BaseURL: ac.BaseURL, + AgentID: ac.AgentID, + ApiURL: ac.APIURL, + Token: ac.Token, + AgentPK: ac.PK, + Cert: ac.Cert, + ProgramDir: pd, + EXE: exe, + SystemDrive: sd, + MeshInstaller: "meshagent.exe", + MeshSystemEXE: MeshSysExe, + MeshSVC: meshSvcName, + PyBin: pybin, + Headers: headers, + Logger: logger, + Version: version, + Debug: logger.IsLevelEnabled(logrus.DebugLevel), + rClient: restyC, + Proxy: ac.Proxy, + Platform: runtime.GOOS, + GoArch: runtime.GOARCH, + ServiceConfig: svcConf, + NatsServer: natsServer, + NatsProxyPath: natsProxyPath, + NatsProxyPort: natsProxyPort, + NatsPingInterval: natsPingInterval, + NatsWSCompression: natsWsCompression, } } @@ -399,14 +412,18 @@ func (a *Agent) SyncMeshNodeID() { } func (a *Agent) setupNatsOptions() []nats.Option { + reconnectWait := randRange(2, 8) opts := make([]nats.Option, 0) opts = append(opts, nats.Name(a.AgentID)) opts = append(opts, nats.UserInfo(a.AgentID, a.Token)) - opts = append(opts, nats.ReconnectWait(time.Second*5)) + opts = append(opts, nats.ReconnectWait(time.Duration(reconnectWait)*time.Second)) opts = append(opts, nats.RetryOnFailedConnect(true)) + opts = append(opts, nats.PingInterval(time.Duration(a.NatsPingInterval)*time.Second)) + opts = append(opts, nats.Compression(a.NatsWSCompression)) opts = append(opts, nats.MaxReconnects(-1)) opts = append(opts, nats.ReconnectBufSize(-1)) opts = append(opts, nats.ProxyPath(a.NatsProxyPath)) + opts = append(opts, nats.ReconnectJitter(500*time.Millisecond, 4*time.Second)) opts = append(opts, nats.DisconnectErrHandler(func(nc *nats.Conn, err error) { a.Logger.Errorln("NATS disconnected:", err) a.Logger.Errorf("%+v\n", nc.Statistics) diff --git a/agent/agent_unix.go b/agent/agent_unix.go index f33a307..a3b06f1 100644 --- a/agent/agent_unix.go +++ b/agent/agent_unix.go @@ -158,6 +158,7 @@ func NewAgentConfig() *rmm.AgentConfig { NatsProxyPath: viper.GetString("natsproxypath"), NatsProxyPort: viper.GetString("natsproxyport"), NatsStandardPort: viper.GetString("natsstandardport"), + NatsPingInterval: viper.GetInt("natspinginterval"), } return ret } diff --git a/agent/agent_windows.go b/agent/agent_windows.go index 446ddd6..263d1c1 100644 --- a/agent/agent_windows.go +++ b/agent/agent_windows.go @@ -65,6 +65,8 @@ func NewAgentConfig() *rmm.AgentConfig { natsProxyPath, _, _ := k.GetStringValue("NatsProxyPath") natsProxyPort, _, _ := k.GetStringValue("NatsProxyPort") natsStandardPort, _, _ := k.GetStringValue("NatsStandardPort") + natsPingInterval, _, _ := k.GetStringValue("NatsPingInterval") + npi, _ := strconv.Atoi(natsPingInterval) return &rmm.AgentConfig{ BaseURL: baseurl, @@ -79,6 +81,7 @@ func NewAgentConfig() *rmm.AgentConfig { NatsProxyPath: natsProxyPath, NatsProxyPort: natsProxyPort, NatsStandardPort: natsStandardPort, + NatsPingInterval: npi, } } diff --git a/agent/rpc.go b/agent/rpc.go index b883ead..5eaf456 100644 --- a/agent/rpc.go +++ b/agent/rpc.go @@ -55,6 +55,8 @@ func (a *Agent) RunRPC() { opts := a.setupNatsOptions() nc, err := nats.Connect(a.NatsServer, opts...) + a.Logger.Debugf("%+v\n", nc) + a.Logger.Debugf("%+v\n", nc.Opts) if err != nil { a.Logger.Fatalln("RunRPC() nats.Connect()", err) } diff --git a/shared/types.go b/shared/types.go index de7f2a9..f857ccb 100644 --- a/shared/types.go +++ b/shared/types.go @@ -45,6 +45,7 @@ type AgentConfig struct { NatsProxyPath string NatsProxyPort string NatsStandardPort string + NatsPingInterval int } type RunScriptResp struct {