diff --git a/example.toml b/example.toml index 5c35bcf..d4c299c 100644 --- a/example.toml +++ b/example.toml @@ -7,18 +7,18 @@ hostname = "localhost" # Which Statsd server to send data to [statsd] hostname = "localhost" -port = 8125 # optional, default: 8125 -prefix = "mqtt.stats" # optional, default: "mqtt.stats" +#port = 8125 # optional, default: 8125 +#prefix = "mqtt.stats" # optional, default: "mqtt.stats" # List of topics to subscribe to, and corresponding stat [[topics]] -mqtt_topic = "$SYS/broker/messages/publish/sent" -statsd_topic = "publish_message_sent" +mqtt = "$SYS/broker/messages/publish/sent" +statsd = "publish_message_sent" [[topics]] -mqtt_topic = "$SYS/broker/clients/connected" -statsd_topic = "connected_clients" +mqtt = "$SYS/broker/clients/connected" +statsd = "connected_clients" [[topics]] -mqtt_topic = "hello/world" -statsd_topic = "hello_world" +mqtt = "hello/world" +statsd = "hello_world" diff --git a/src/config.rs b/src/config.rs index f2e5ee3..c2e14fc 100644 --- a/src/config.rs +++ b/src/config.rs @@ -46,8 +46,8 @@ pub struct StatsdConfig { #[derive(Debug, Deserialize)] pub struct Topic { - pub mqtt_topic: String, - pub statsd_topic: String + pub mqtt: String, + pub statsd: String } // ============================================================================= diff --git a/src/main.rs b/src/main.rs index 5c51a6a..2c893e7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,9 +1,11 @@ -use std::{thread, error::Error, time::Duration}; +use std::{thread, error::Error, time::Duration, collections::HashMap}; use log::{error, warn, info, debug}; use simplelog::{ TermLogger, LevelFilter, ConfigBuilder, TerminalMode, ColorChoice}; use paho_mqtt as paho; +use statsd; + mod cli_args; mod config; mod mqtt; @@ -26,8 +28,27 @@ fn try_reconnect( mqtt: &paho::Client, timeout: u32 ) -> bool { //------------------------------------------------------------------------------ -fn handle_msg( msg: &paho::Message ) { - info!("New message: {}", msg) +fn handle_msg( + msg: &paho::Message, + topics: &HashMap, + statsd: &statsd::Client ) { + + let value = msg.payload_str().parse::(); + if value.is_err() { + warn!( + "{}: message \"{}\" is not a number", + msg.topic(), + msg.payload_str() + ); + return; + } + let value = value.unwrap(); + + let mqtt_topic = msg.topic(); + let statsd_topic = topics.get(mqtt_topic).unwrap(); + + statsd.gauge(statsd_topic, value); + info!("{} ({}): {}", mqtt_topic, statsd_topic, value); } //------------------------------------------------------------------------------ @@ -41,6 +62,19 @@ fn main_impl( args: &cli_args::Args ) -> Result<(), Box> { let rx = mqtt.start_consuming(); mqtt::subscribe( &mqtt, &config.topics, args.verbose ); + // init statsd + let statsd = statsd::Client::new( + format!("{}:{}", config.statsd.hostname, config.statsd.port), + &config.statsd.prefix + )?; + + // MQTT -> statsd topic translation table + let mut topics = HashMap::new(); + for topic in config.topics { + topics.insert( topic.mqtt, topic.statsd ); + } + let topics = topics; + // handle ^C signal to quit gracefully let ctrlc_mqtt = mqtt.clone(); ctrlc::set_handler(move || { @@ -54,7 +88,7 @@ fn main_impl( args: &cli_args::Args ) -> Result<(), Box> { } if let Some(msg) = msg { - handle_msg(&msg) + handle_msg(&msg, &topics, &statsd); } } diff --git a/src/mqtt.rs b/src/mqtt.rs index 940d7f6..8bc1374 100644 --- a/src/mqtt.rs +++ b/src/mqtt.rs @@ -30,11 +30,11 @@ pub fn create( config: &MqttConfig ) -> Result { pub fn subscribe( mqtt: &paho::Client, topics: &Vec, verbose: bool ) { for topic in topics { - let e = mqtt.subscribe(topic.mqtt_topic.as_str(), 1); + let e = mqtt.subscribe(topic.mqtt.as_str(), 1); if e.is_ok() { - info!( "Topic subscribed: \"{}\"", topic.mqtt_topic ); + info!( "Topic subscribed: \"{}\"", topic.mqtt ); } else if verbose { - warn!( "Cannot subscribe to topic \"{}\" (error: {e:?})", topic.mqtt_topic ); + warn!( "Cannot subscribe to topic \"{}\" (error: {e:?})", topic.mqtt ); } } }