2 Commits

Author SHA1 Message Date
  n0m1s 4b73ac9048
send message to statsd 2 years ago
  n0m1s f0dee2e3f0
add statsd prefix 2 years ago
4 changed files with 63 additions and 20 deletions
Unified View
  1. +8
    -8
      example.toml
  2. +14
    -5
      src/config.rs
  3. +38
    -4
      src/main.rs
  4. +3
    -3
      src/mqtt.rs

+ 8
- 8
example.toml View File

@ -7,18 +7,18 @@ hostname = "localhost"
# Which Statsd server to send data to # Which Statsd server to send data to
[statsd] [statsd]
hostname = "localhost" hostname = "localhost"
port = 8125 # optional, default: 8125
prefix = "mqtt.stats" # optional, default: "mqtt.stats"
#port = 8125 # optional, default: 8125
#prefix = "mqtt.stats" # optional, default: "mqtt.stats"
# List of topics to subscribe to, and corresponding stat # List of topics to subscribe to, and corresponding stat
[[topics]] [[topics]]
mqtt_topic = "$SYS/broker/messages/publish/sent"
statsd_topic = "publish_message_sent"
mqtt = "$SYS/broker/messages/publish/sent"
statsd = "publish_message_sent"
[[topics]] [[topics]]
mqtt_topic = "$SYS/broker/clients/connected"
statsd_topic = "connected_clients"
mqtt = "$SYS/broker/clients/connected"
statsd = "connected_clients"
[[topics]] [[topics]]
mqtt_topic = "hello/world"
statsd_topic = "hello_world"
mqtt = "hello/world"
statsd = "hello_world"

+ 14
- 5
src/config.rs View File

@ -24,7 +24,7 @@ pub struct MqttConfig {
#[serde(default = "default_mqtt_port")] #[serde(default = "default_mqtt_port")]
pub port: u32, pub port: u32,
#[serde(default = "default_timeout")]
#[serde(default = "default_mqtt_timeout")]
pub timeout: u32 pub timeout: u32
} }
@ -35,6 +35,9 @@ pub struct MqttConfig {
pub struct StatsdConfig { pub struct StatsdConfig {
pub hostname: String, pub hostname: String,
#[serde(default = "default_statsd_prefix")]
pub prefix: String,
#[serde(default = "default_statsd_port")] #[serde(default = "default_statsd_port")]
pub port: u32 pub port: u32
} }
@ -43,8 +46,8 @@ pub struct StatsdConfig {
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]
pub struct Topic { pub struct Topic {
pub mqtt_topic: String,
pub statsd_topic: String
pub mqtt: String,
pub statsd: String
} }
// ============================================================================= // =============================================================================
@ -66,12 +69,18 @@ fn default_mqtt_port() -> u32 {
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
fn default_mqtt_timeout() -> u32 {
60
}
//------------------------------------------------------------------------------
fn default_statsd_port() -> u32 { fn default_statsd_port() -> u32 {
8125 8125
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
fn default_timeout() -> u32 {
60
fn default_statsd_prefix() -> String {
String::from("mqtt.stats")
} }

+ 38
- 4
src/main.rs View File

@ -1,9 +1,11 @@
use std::{thread, error::Error, time::Duration};
use std::{thread, error::Error, time::Duration, collections::HashMap};
use log::{error, warn, info, debug}; use log::{error, warn, info, debug};
use simplelog::{ TermLogger, LevelFilter, ConfigBuilder, TerminalMode, ColorChoice}; use simplelog::{ TermLogger, LevelFilter, ConfigBuilder, TerminalMode, ColorChoice};
use paho_mqtt as paho; use paho_mqtt as paho;
use statsd;
mod cli_args; mod cli_args;
mod config; mod config;
mod mqtt; mod mqtt;
@ -26,8 +28,27 @@ fn try_reconnect( mqtt: &paho::Client, timeout: u32 ) -> bool {
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
fn handle_msg( msg: &paho::Message ) {
info!("New message: {}", msg)
fn handle_msg(
msg: &paho::Message,
topics: &HashMap<String, String>,
statsd: &statsd::Client ) {
let value = msg.payload_str().parse::<f64>();
if value.is_err() {
warn!(
"{}: message \"{}\" is not a number",
msg.topic(),
msg.payload_str()
);
return;
}
let value = value.unwrap();
let mqtt_topic = msg.topic();
let statsd_topic = topics.get(mqtt_topic).unwrap();
statsd.gauge(statsd_topic, value);
info!("{} ({}): {}", mqtt_topic, statsd_topic, value);
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@ -41,6 +62,19 @@ fn main_impl( args: &cli_args::Args ) -> Result<(), Box<dyn Error>> {
let rx = mqtt.start_consuming(); let rx = mqtt.start_consuming();
mqtt::subscribe( &mqtt, &config.topics, args.verbose ); mqtt::subscribe( &mqtt, &config.topics, args.verbose );
// init statsd
let statsd = statsd::Client::new(
format!("{}:{}", config.statsd.hostname, config.statsd.port),
&config.statsd.prefix
)?;
// MQTT -> statsd topic translation table
let mut topics = HashMap::new();
for topic in config.topics {
topics.insert( topic.mqtt, topic.statsd );
}
let topics = topics;
// handle ^C signal to quit gracefully // handle ^C signal to quit gracefully
let ctrlc_mqtt = mqtt.clone(); let ctrlc_mqtt = mqtt.clone();
ctrlc::set_handler(move || { ctrlc::set_handler(move || {
@ -54,7 +88,7 @@ fn main_impl( args: &cli_args::Args ) -> Result<(), Box<dyn Error>> {
} }
if let Some(msg) = msg { if let Some(msg) = msg {
handle_msg(&msg)
handle_msg(&msg, &topics, &statsd);
} }
} }


+ 3
- 3
src/mqtt.rs View File

@ -30,11 +30,11 @@ pub fn create( config: &MqttConfig ) -> Result<paho::Client, paho::Error> {
pub fn subscribe( mqtt: &paho::Client, topics: &Vec<Topic>, verbose: bool ) { pub fn subscribe( mqtt: &paho::Client, topics: &Vec<Topic>, verbose: bool ) {
for topic in topics { for topic in topics {
let e = mqtt.subscribe(topic.mqtt_topic.as_str(), 1);
let e = mqtt.subscribe(topic.mqtt.as_str(), 1);
if e.is_ok() { if e.is_ok() {
info!( "Topic subscribed: \"{}\"", topic.mqtt_topic );
info!( "Topic subscribed: \"{}\"", topic.mqtt );
} else if verbose { } else if verbose {
warn!( "Cannot subscribe to topic \"{}\" (error: {e:?})", topic.mqtt_topic );
warn!( "Cannot subscribe to topic \"{}\" (error: {e:?})", topic.mqtt );
} }
} }
} }

Loading…
Cancel
Save