Browse Source

exit gracefully when MQTT server is disconnected

master
n0m1s 2 years ago
parent
commit
d20a58abd2
Signed by: nomis GPG Key ID: BC0454CAD76FE803
5 changed files with 67 additions and 16 deletions
  1. +22
    -0
      Cargo.lock
  2. +1
    -0
      Cargo.toml
  3. +1
    -0
      example.toml
  4. +11
    -2
      src/config.rs
  5. +32
    -14
      src/main.rs

+ 22
- 0
Cargo.lock View File

@ -174,6 +174,16 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "ctrlc"
version = "3.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "82e95fbd621905b854affdc67943b043a0fbb6ed7385fd5a25650d19a8a6cfdf"
dependencies = [
"nix",
"windows-sys",
]
[[package]]
name = "deranged"
version = "0.3.8"
@ -393,6 +403,7 @@ name = "mqtt2statsd"
version = "0.1.0"
dependencies = [
"clap",
"ctrlc",
"log",
"paho-mqtt",
"serde",
@ -401,6 +412,17 @@ dependencies = [
"toml",
]
[[package]]
name = "nix"
version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2eb04e9c688eff1c89d72b407f168cf79bb9e867a9d3323ed6c01519eb9cc053"
dependencies = [
"bitflags",
"cfg-if",
"libc",
]
[[package]]
name = "num_threads"
version = "0.1.6"


+ 1
- 0
Cargo.toml View File

@ -12,3 +12,4 @@ statsd = "0.16"
toml = "0.8"
simplelog = "0.12"
log = "0.4"
ctrlc = "3.4"

+ 1
- 0
example.toml View File

@ -2,6 +2,7 @@
[mqtt]
hostname = "localhost"
#port = 1883 # optional, default: 1883
#timeout = 60 # in seconds, optional, default: 60
# Which Statsd server to send data to
[statsd]


+ 11
- 2
src/config.rs View File

@ -21,8 +21,11 @@ pub struct Config {
pub struct MqttConfig {
pub hostname: String,
#[serde(default = "default_mqtt_port") ]
pub port: u32
#[serde(default = "default_mqtt_port")]
pub port: u32,
#[serde(default = "default_timeout")]
pub timeout: u32
}
//------------------------------------------------------------------------------
@ -66,3 +69,9 @@ fn default_mqtt_port() -> u32 {
fn default_statsd_port() -> u32 {
8125
}
//------------------------------------------------------------------------------
fn default_timeout() -> u32 {
60
}

+ 32
- 14
src/main.rs View File

@ -1,18 +1,32 @@
use std::error::Error;
extern crate log;
use log::{error, info, debug};
use std::{thread, error::Error, time::Duration};
use log::{error, warn, info, debug};
use simplelog::{ TermLogger, LevelFilter, ConfigBuilder, TerminalMode, ColorChoice};
use paho_mqtt::Message;
use paho_mqtt as paho;
mod cli_args;
mod config;
mod mqtt;
fn handle_msg( msg: &Message )
{
//------------------------------------------------------------------------------
fn try_reconnect( mqtt: &paho::Client, timeout: u32 ) -> bool {
warn!("MQTT server lost, trying to reconnect...");
for _ in 0..timeout {
thread::sleep(Duration::from_secs(1));
if mqtt.reconnect().is_ok() {
info!("MQTT server reconnected");
return true;
}
}
error!("MQTT server timed out");
false
}
//------------------------------------------------------------------------------
fn handle_msg( msg: &paho::Message ) {
info!("New message: {}", msg)
}
@ -27,12 +41,15 @@ fn main_impl( args: &cli_args::Args ) -> Result<(), Box<dyn Error>> {
let rx = mqtt.start_consuming();
mqtt::subscribe( &mqtt, &config.topics, args.verbose );
// handle ^C signal to quit gracefully
let ctrlc_mqtt = mqtt.clone();
ctrlc::set_handler(move || {
ctrlc_mqtt.stop_consuming()
})?;
// main event loop
for msg in rx.iter() {
if !mqtt.is_connected() {
if args.verbose {
println!( )
}
if !mqtt.is_connected() && !try_reconnect(&mqtt, config.mqtt.timeout) {
break;
}
@ -41,9 +58,10 @@ fn main_impl( args: &cli_args::Args ) -> Result<(), Box<dyn Error>> {
}
}
// clean up before quitting
if mqtt.is_connected() {
println!("Disconnecting");
//mqtt.unsubscribe_many(topics)
info!("Disconnecting");
mqtt.disconnect(None)?;
}
Ok(())


Loading…
Cancel
Save