From d20a58abd204fc848520e40813ea0292cd23f3b6 Mon Sep 17 00:00:00 2001 From: n0m1s Date: Mon, 18 Sep 2023 22:44:14 -0700 Subject: [PATCH] exit gracefully when MQTT server is disconnected --- Cargo.lock | 22 ++++++++++++++++++++++ Cargo.toml | 1 + example.toml | 1 + src/config.rs | 13 +++++++++++-- src/main.rs | 46 ++++++++++++++++++++++++++++++++-------------- 5 files changed, 67 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7caf121..05b63bb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -174,6 +174,16 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "ctrlc" +version = "3.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82e95fbd621905b854affdc67943b043a0fbb6ed7385fd5a25650d19a8a6cfdf" +dependencies = [ + "nix", + "windows-sys", +] + [[package]] name = "deranged" version = "0.3.8" @@ -393,6 +403,7 @@ name = "mqtt2statsd" version = "0.1.0" dependencies = [ "clap", + "ctrlc", "log", "paho-mqtt", "serde", @@ -401,6 +412,17 @@ dependencies = [ "toml", ] +[[package]] +name = "nix" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2eb04e9c688eff1c89d72b407f168cf79bb9e867a9d3323ed6c01519eb9cc053" +dependencies = [ + "bitflags", + "cfg-if", + "libc", +] + [[package]] name = "num_threads" version = "0.1.6" diff --git a/Cargo.toml b/Cargo.toml index 0849a38..e63971b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,3 +12,4 @@ statsd = "0.16" toml = "0.8" simplelog = "0.12" log = "0.4" +ctrlc = "3.4" diff --git a/example.toml b/example.toml index 9b3347f..5c35bcf 100644 --- a/example.toml +++ b/example.toml @@ -2,6 +2,7 @@ [mqtt] hostname = "localhost" #port = 1883 # optional, default: 1883 +#timeout = 60 # in seconds, optional, default: 60 # Which Statsd server to send data to [statsd] diff --git a/src/config.rs b/src/config.rs index a4487a5..e8c3c36 100644 --- a/src/config.rs +++ b/src/config.rs @@ -21,8 +21,11 @@ pub struct Config { pub struct MqttConfig { pub hostname: String, - #[serde(default = "default_mqtt_port") ] - pub port: u32 + #[serde(default = "default_mqtt_port")] + pub port: u32, + + #[serde(default = "default_timeout")] + pub timeout: u32 } //------------------------------------------------------------------------------ @@ -66,3 +69,9 @@ fn default_mqtt_port() -> u32 { fn default_statsd_port() -> u32 { 8125 } + +//------------------------------------------------------------------------------ + +fn default_timeout() -> u32 { + 60 +} diff --git a/src/main.rs b/src/main.rs index 94b9c4c..5c51a6a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,18 +1,32 @@ -use std::error::Error; - -extern crate log; -use log::{error, info, debug}; +use std::{thread, error::Error, time::Duration}; +use log::{error, warn, info, debug}; use simplelog::{ TermLogger, LevelFilter, ConfigBuilder, TerminalMode, ColorChoice}; - -use paho_mqtt::Message; +use paho_mqtt as paho; mod cli_args; mod config; mod mqtt; -fn handle_msg( msg: &Message ) -{ +//------------------------------------------------------------------------------ + +fn try_reconnect( mqtt: &paho::Client, timeout: u32 ) -> bool { + warn!("MQTT server lost, trying to reconnect..."); + for _ in 0..timeout { + thread::sleep(Duration::from_secs(1)); + if mqtt.reconnect().is_ok() { + info!("MQTT server reconnected"); + return true; + } + } + + error!("MQTT server timed out"); + false +} + +//------------------------------------------------------------------------------ + +fn handle_msg( msg: &paho::Message ) { info!("New message: {}", msg) } @@ -27,12 +41,15 @@ fn main_impl( args: &cli_args::Args ) -> Result<(), Box> { let rx = mqtt.start_consuming(); mqtt::subscribe( &mqtt, &config.topics, args.verbose ); + // handle ^C signal to quit gracefully + let ctrlc_mqtt = mqtt.clone(); + ctrlc::set_handler(move || { + ctrlc_mqtt.stop_consuming() + })?; + // main event loop for msg in rx.iter() { - if !mqtt.is_connected() { - if args.verbose { - println!( ) - } + if !mqtt.is_connected() && !try_reconnect(&mqtt, config.mqtt.timeout) { break; } @@ -41,9 +58,10 @@ fn main_impl( args: &cli_args::Args ) -> Result<(), Box> { } } + // clean up before quitting if mqtt.is_connected() { - println!("Disconnecting"); - //mqtt.unsubscribe_many(topics) + info!("Disconnecting"); + mqtt.disconnect(None)?; } Ok(())