2 Commits

6 changed files with 222 additions and 20 deletions
Unified View
  1. +130
    -4
      Cargo.lock
  2. +4
    -1
      Cargo.toml
  3. +1
    -0
      example.toml
  4. +11
    -2
      src/config.rs
  5. +72
    -11
      src/main.rs
  6. +4
    -2
      src/mqtt.rs

+ 130
- 4
Cargo.lock View File

@ -174,6 +174,22 @@ dependencies = [
"cfg-if", "cfg-if",
] ]
[[package]]
name = "ctrlc"
version = "3.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "82e95fbd621905b854affdc67943b043a0fbb6ed7385fd5a25650d19a8a6cfdf"
dependencies = [
"nix",
"windows-sys",
]
[[package]]
name = "deranged"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f2696e8a945f658fd14dc3b87242e6b80cd0f36ff04ea560fa39082368847946"
[[package]] [[package]]
name = "equivalent" name = "equivalent"
version = "1.0.1" version = "1.0.1"
@ -352,6 +368,12 @@ dependencies = [
"windows-sys", "windows-sys",
] ]
[[package]]
name = "itoa"
version = "1.0.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38"
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.147" version = "0.2.147"
@ -381,12 +403,35 @@ name = "mqtt2statsd"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"clap", "clap",
"ctrlc",
"log",
"paho-mqtt", "paho-mqtt",
"serde", "serde",
"simplelog",
"statsd", "statsd",
"toml", "toml",
] ]
[[package]]
name = "nix"
version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2eb04e9c688eff1c89d72b407f168cf79bb9e867a9d3323ed6c01519eb9cc053"
dependencies = [
"bitflags",
"cfg-if",
"libc",
]
[[package]]
name = "num_threads"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2819ce041d2ee131036f4fc9d6ae7ae125a3a40e97ba64d04fe799ad9dabbb44"
dependencies = [
"libc",
]
[[package]] [[package]]
name = "once_cell" name = "once_cell"
version = "1.18.0" version = "1.18.0"
@ -545,6 +590,17 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "simplelog"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acee08041c5de3d5048c8b3f6f13fafb3026b24ba43c6a695a0c76179b844369"
dependencies = [
"log",
"termcolor",
"time",
]
[[package]] [[package]]
name = "slab" name = "slab"
version = "0.4.9" version = "0.4.9"
@ -580,6 +636,15 @@ dependencies = [
"unicode-ident", "unicode-ident",
] ]
[[package]]
name = "termcolor"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bab24d30b911b2376f3a13cc2cd443142f0c81dda04c118693e35b3835757755"
dependencies = [
"winapi-util",
]
[[package]] [[package]]
name = "thiserror" name = "thiserror"
version = "1.0.48" version = "1.0.48"
@ -600,11 +665,41 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "time"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17f6bb557fd245c28e6411aa56b6403c689ad95061f50e4be16c274e70a17e48"
dependencies = [
"deranged",
"itoa",
"libc",
"num_threads",
"serde",
"time-core",
"time-macros",
]
[[package]]
name = "time-core"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7300fbefb4dadc1af235a9cef3737cea692a9d97e1b9cbcd4ebdae6f8868e6fb"
[[package]]
name = "time-macros"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a942f44339478ef67935ab2bbaec2fb0322496cf3cbe84b261e06ac3814c572"
dependencies = [
"time-core",
]
[[package]] [[package]]
name = "toml" name = "toml"
version = "0.7.8"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd79e69d3b627db300ff956027cc6c3798cef26d22526befdfcd12feeb6d2257"
checksum = "c226a7bba6d859b63c92c4b4fe69c5b6b72d0cb897dbc8e6012298e6154cb56e"
dependencies = [ dependencies = [
"serde", "serde",
"serde_spanned", "serde_spanned",
@ -623,9 +718,9 @@ dependencies = [
[[package]] [[package]]
name = "toml_edit" name = "toml_edit"
version = "0.19.15"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421"
checksum = "8ff63e60a958cefbb518ae1fd6566af80d9d4be430a33f3723dfc47d1d411d95"
dependencies = [ dependencies = [
"indexmap", "indexmap",
"serde", "serde",
@ -658,6 +753,37 @@ version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "winapi"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419"
dependencies = [
"winapi-i686-pc-windows-gnu",
"winapi-x86_64-pc-windows-gnu",
]
[[package]]
name = "winapi-i686-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-util"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178"
dependencies = [
"winapi",
]
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]] [[package]]
name = "windows-sys" name = "windows-sys"
version = "0.48.0" version = "0.48.0"


+ 4
- 1
Cargo.toml View File

@ -9,4 +9,7 @@ clap = { version = "4.3", features = [ "derive", "cargo" ]}
paho-mqtt = "0.12" paho-mqtt = "0.12"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
statsd = "0.16" statsd = "0.16"
toml = "0.7"
toml = "0.8"
simplelog = "0.12"
log = "0.4"
ctrlc = "3.4"

+ 1
- 0
example.toml View File

@ -2,6 +2,7 @@
[mqtt] [mqtt]
hostname = "localhost" hostname = "localhost"
#port = 1883 # optional, default: 1883 #port = 1883 # optional, default: 1883
#timeout = 60 # in seconds, optional, default: 60
# Which Statsd server to send data to # Which Statsd server to send data to
[statsd] [statsd]


+ 11
- 2
src/config.rs View File

@ -21,8 +21,11 @@ pub struct Config {
pub struct MqttConfig { pub struct MqttConfig {
pub hostname: String, pub hostname: String,
#[serde(default = "default_mqtt_port") ]
pub port: u32
#[serde(default = "default_mqtt_port")]
pub port: u32,
#[serde(default = "default_timeout")]
pub timeout: u32
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@ -66,3 +69,9 @@ fn default_mqtt_port() -> u32 {
fn default_statsd_port() -> u32 { fn default_statsd_port() -> u32 {
8125 8125
} }
//------------------------------------------------------------------------------
fn default_timeout() -> u32 {
60
}

+ 72
- 11
src/main.rs View File

@ -1,32 +1,93 @@
use std::error::Error;
use std::{thread, error::Error, time::Duration};
use log::{error, warn, info, debug};
use simplelog::{ TermLogger, LevelFilter, ConfigBuilder, TerminalMode, ColorChoice};
use paho_mqtt as paho;
mod cli_args; mod cli_args;
mod config; mod config;
mod mqtt; mod mqtt;
fn main() -> Result<(), Box<dyn Error>> {
let args = cli_args::parse();
let config = config::from_file(&args.configuration)?;
//------------------------------------------------------------------------------
println!("{:#?}", config); //TODO: remove
fn try_reconnect( mqtt: &paho::Client, timeout: u32 ) -> bool {
warn!("MQTT server lost, trying to reconnect...");
for _ in 0..timeout {
thread::sleep(Duration::from_secs(1));
if mqtt.reconnect().is_ok() {
info!("MQTT server reconnected");
return true;
}
}
error!("MQTT server timed out");
false
}
//------------------------------------------------------------------------------
fn handle_msg( msg: &paho::Message )
{
info!("New message: {}", msg)
}
//------------------------------------------------------------------------------
fn main_impl( args: &cli_args::Args ) -> Result<(), Box<dyn Error>> {
let config = config::from_file(&args.configuration)?;
debug!("Config: {:?}", config);
// init MQTT
let mqtt = mqtt::create( &config.mqtt )?; let mqtt = mqtt::create( &config.mqtt )?;
let rx = mqtt.start_consuming(); let rx = mqtt.start_consuming();
mqtt::subscribe( &mqtt, &config.topics, args.verbose ); mqtt::subscribe( &mqtt, &config.topics, args.verbose );
// handle ^C signal to quit gracefully
let ctrlc_mqtt = mqtt.clone();
ctrlc::set_handler(move || {
ctrlc_mqtt.stop_consuming()
})?;
// main event loop
for msg in rx.iter() { for msg in rx.iter() {
if let Some(msg) = msg {
println!("{}", msg);
} else if !mqtt.is_connected() {
println!("disconnected");
if !mqtt.is_connected() && !try_reconnect(&mqtt, config.mqtt.timeout) {
break; break;
} }
if let Some(msg) = msg {
handle_msg(&msg)
}
} }
if mqtt.is_connected() { if mqtt.is_connected() {
println!("Disconnecting");
//mqtt.unsubscribe_many(topics)
info!("Disconnecting");
mqtt.disconnect(None)?;
} }
Ok(()) Ok(())
} }
//------------------------------------------------------------------------------
fn main() {
let args = cli_args::parse();
// set up logging
TermLogger::init(
match args.verbose {
true => LevelFilter::Debug,
false => LevelFilter::Info
},
ConfigBuilder::new()
.set_time_format_rfc3339()
.set_time_offset_to_local().unwrap()
.build(),
TerminalMode::Mixed,
ColorChoice::Auto
).unwrap();
// actually run the program
if let Err(e) = main_impl(&args) {
error!("Error: {}", e)
}
}

+ 4
- 2
src/mqtt.rs View File

@ -1,5 +1,7 @@
extern crate paho_mqtt as paho; extern crate paho_mqtt as paho;
use log::{warn, info};
use crate::config::MqttConfig; use crate::config::MqttConfig;
use crate::config::Topic; use crate::config::Topic;
@ -30,9 +32,9 @@ pub fn subscribe( mqtt: &paho::Client, topics: &Vec<Topic>, verbose: bool ) {
for topic in topics { for topic in topics {
let e = mqtt.subscribe(topic.mqtt_topic.as_str(), 1); let e = mqtt.subscribe(topic.mqtt_topic.as_str(), 1);
if e.is_ok() { if e.is_ok() {
println!( "Subscribed to {}", topic.mqtt_topic );
info!( "Topic subscribed: \"{}\"", topic.mqtt_topic );
} else if verbose { } else if verbose {
println!( "Cannot subscribe to {} (error: {e:?})", topic.mqtt_topic );
warn!( "Cannot subscribe to topic \"{}\" (error: {e:?})", topic.mqtt_topic );
} }
} }
} }

Loading…
Cancel
Save