Browse Source

send message to statsd

master
n0m1s 2 years ago
parent
commit
4b73ac9048
Signed by: nomis GPG Key ID: BC0454CAD76FE803
4 changed files with 51 additions and 17 deletions
  1. +8
    -8
      example.toml
  2. +2
    -2
      src/config.rs
  3. +38
    -4
      src/main.rs
  4. +3
    -3
      src/mqtt.rs

+ 8
- 8
example.toml View File

@ -7,18 +7,18 @@ hostname = "localhost"
# Which Statsd server to send data to
[statsd]
hostname = "localhost"
port = 8125 # optional, default: 8125
prefix = "mqtt.stats" # optional, default: "mqtt.stats"
#port = 8125 # optional, default: 8125
#prefix = "mqtt.stats" # optional, default: "mqtt.stats"
# List of topics to subscribe to, and corresponding stat
[[topics]]
mqtt_topic = "$SYS/broker/messages/publish/sent"
statsd_topic = "publish_message_sent"
mqtt = "$SYS/broker/messages/publish/sent"
statsd = "publish_message_sent"
[[topics]]
mqtt_topic = "$SYS/broker/clients/connected"
statsd_topic = "connected_clients"
mqtt = "$SYS/broker/clients/connected"
statsd = "connected_clients"
[[topics]]
mqtt_topic = "hello/world"
statsd_topic = "hello_world"
mqtt = "hello/world"
statsd = "hello_world"

+ 2
- 2
src/config.rs View File

@ -46,8 +46,8 @@ pub struct StatsdConfig {
#[derive(Debug, Deserialize)]
pub struct Topic {
pub mqtt_topic: String,
pub statsd_topic: String
pub mqtt: String,
pub statsd: String
}
// =============================================================================


+ 38
- 4
src/main.rs View File

@ -1,9 +1,11 @@
use std::{thread, error::Error, time::Duration};
use std::{thread, error::Error, time::Duration, collections::HashMap};
use log::{error, warn, info, debug};
use simplelog::{ TermLogger, LevelFilter, ConfigBuilder, TerminalMode, ColorChoice};
use paho_mqtt as paho;
use statsd;
mod cli_args;
mod config;
mod mqtt;
@ -26,8 +28,27 @@ fn try_reconnect( mqtt: &paho::Client, timeout: u32 ) -> bool {
//------------------------------------------------------------------------------
fn handle_msg( msg: &paho::Message ) {
info!("New message: {}", msg)
fn handle_msg(
msg: &paho::Message,
topics: &HashMap<String, String>,
statsd: &statsd::Client ) {
let value = msg.payload_str().parse::<f64>();
if value.is_err() {
warn!(
"{}: message \"{}\" is not a number",
msg.topic(),
msg.payload_str()
);
return;
}
let value = value.unwrap();
let mqtt_topic = msg.topic();
let statsd_topic = topics.get(mqtt_topic).unwrap();
statsd.gauge(statsd_topic, value);
info!("{} ({}): {}", mqtt_topic, statsd_topic, value);
}
//------------------------------------------------------------------------------
@ -41,6 +62,19 @@ fn main_impl( args: &cli_args::Args ) -> Result<(), Box<dyn Error>> {
let rx = mqtt.start_consuming();
mqtt::subscribe( &mqtt, &config.topics, args.verbose );
// init statsd
let statsd = statsd::Client::new(
format!("{}:{}", config.statsd.hostname, config.statsd.port),
&config.statsd.prefix
)?;
// MQTT -> statsd topic translation table
let mut topics = HashMap::new();
for topic in config.topics {
topics.insert( topic.mqtt, topic.statsd );
}
let topics = topics;
// handle ^C signal to quit gracefully
let ctrlc_mqtt = mqtt.clone();
ctrlc::set_handler(move || {
@ -54,7 +88,7 @@ fn main_impl( args: &cli_args::Args ) -> Result<(), Box<dyn Error>> {
}
if let Some(msg) = msg {
handle_msg(&msg)
handle_msg(&msg, &topics, &statsd);
}
}


+ 3
- 3
src/mqtt.rs View File

@ -30,11 +30,11 @@ pub fn create( config: &MqttConfig ) -> Result<paho::Client, paho::Error> {
pub fn subscribe( mqtt: &paho::Client, topics: &Vec<Topic>, verbose: bool ) {
for topic in topics {
let e = mqtt.subscribe(topic.mqtt_topic.as_str(), 1);
let e = mqtt.subscribe(topic.mqtt.as_str(), 1);
if e.is_ok() {
info!( "Topic subscribed: \"{}\"", topic.mqtt_topic );
info!( "Topic subscribed: \"{}\"", topic.mqtt );
} else if verbose {
warn!( "Cannot subscribe to topic \"{}\" (error: {e:?})", topic.mqtt_topic );
warn!( "Cannot subscribe to topic \"{}\" (error: {e:?})", topic.mqtt );
}
}
}

Loading…
Cancel
Save