|
|
@ -1,9 +1,11 @@ |
|
|
use std::{thread, error::Error, time::Duration};
|
|
|
|
|
|
|
|
|
use std::{thread, error::Error, time::Duration, collections::HashMap};
|
|
|
|
|
|
|
|
|
use log::{error, warn, info, debug};
|
|
|
use log::{error, warn, info, debug};
|
|
|
use simplelog::{ TermLogger, LevelFilter, ConfigBuilder, TerminalMode, ColorChoice};
|
|
|
use simplelog::{ TermLogger, LevelFilter, ConfigBuilder, TerminalMode, ColorChoice};
|
|
|
use paho_mqtt as paho;
|
|
|
use paho_mqtt as paho;
|
|
|
|
|
|
|
|
|
|
|
|
use statsd;
|
|
|
|
|
|
|
|
|
mod cli_args;
|
|
|
mod cli_args;
|
|
|
mod config;
|
|
|
mod config;
|
|
|
mod mqtt;
|
|
|
mod mqtt;
|
|
|
@ -26,8 +28,27 @@ fn try_reconnect( mqtt: &paho::Client, timeout: u32 ) -> bool { |
|
|
|
|
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
fn handle_msg( msg: &paho::Message ) {
|
|
|
|
|
|
info!("New message: {}", msg)
|
|
|
|
|
|
|
|
|
fn handle_msg(
|
|
|
|
|
|
msg: &paho::Message,
|
|
|
|
|
|
topics: &HashMap<String, String>,
|
|
|
|
|
|
statsd: &statsd::Client ) {
|
|
|
|
|
|
|
|
|
|
|
|
let value = msg.payload_str().parse::<f64>();
|
|
|
|
|
|
if value.is_err() {
|
|
|
|
|
|
warn!(
|
|
|
|
|
|
"{}: message \"{}\" is not a number",
|
|
|
|
|
|
msg.topic(),
|
|
|
|
|
|
msg.payload_str()
|
|
|
|
|
|
);
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
|
|
|
|
|
let value = value.unwrap();
|
|
|
|
|
|
|
|
|
|
|
|
let mqtt_topic = msg.topic();
|
|
|
|
|
|
let statsd_topic = topics.get(mqtt_topic).unwrap();
|
|
|
|
|
|
|
|
|
|
|
|
statsd.gauge(statsd_topic, value);
|
|
|
|
|
|
info!("{} ({}): {}", mqtt_topic, statsd_topic, value);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
//------------------------------------------------------------------------------
|
|
|
@ -41,6 +62,19 @@ fn main_impl( args: &cli_args::Args ) -> Result<(), Box<dyn Error>> { |
|
|
let rx = mqtt.start_consuming();
|
|
|
let rx = mqtt.start_consuming();
|
|
|
mqtt::subscribe( &mqtt, &config.topics, args.verbose );
|
|
|
mqtt::subscribe( &mqtt, &config.topics, args.verbose );
|
|
|
|
|
|
|
|
|
|
|
|
// init statsd
|
|
|
|
|
|
let statsd = statsd::Client::new(
|
|
|
|
|
|
format!("{}:{}", config.statsd.hostname, config.statsd.port),
|
|
|
|
|
|
&config.statsd.prefix
|
|
|
|
|
|
)?;
|
|
|
|
|
|
|
|
|
|
|
|
// MQTT -> statsd topic translation table
|
|
|
|
|
|
let mut topics = HashMap::new();
|
|
|
|
|
|
for topic in config.topics {
|
|
|
|
|
|
topics.insert( topic.mqtt, topic.statsd );
|
|
|
|
|
|
}
|
|
|
|
|
|
let topics = topics;
|
|
|
|
|
|
|
|
|
// handle ^C signal to quit gracefully
|
|
|
// handle ^C signal to quit gracefully
|
|
|
let ctrlc_mqtt = mqtt.clone();
|
|
|
let ctrlc_mqtt = mqtt.clone();
|
|
|
ctrlc::set_handler(move || {
|
|
|
ctrlc::set_handler(move || {
|
|
|
@ -54,7 +88,7 @@ fn main_impl( args: &cli_args::Args ) -> Result<(), Box<dyn Error>> { |
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
if let Some(msg) = msg {
|
|
|
if let Some(msg) = msg {
|
|
|
handle_msg(&msg)
|
|
|
|
|
|
|
|
|
handle_msg(&msg, &topics, &statsd);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
|