From ec2a85e1ae1bb84240d3b48e2c201208666a9040 Mon Sep 17 00:00:00 2001 From: n0m1s Date: Mon, 18 Sep 2023 21:23:43 -0700 Subject: [PATCH] split code in separate modules --- src/cli_args.rs | 22 ++++++++++ src/config.rs | 68 ++++++++++++++++++++++++++++++ src/main.rs | 110 +++++------------------------------------------- src/mqtt.rs | 38 +++++++++++++++++ 4 files changed, 139 insertions(+), 99 deletions(-) create mode 100644 src/cli_args.rs create mode 100644 src/config.rs create mode 100644 src/mqtt.rs diff --git a/src/cli_args.rs b/src/cli_args.rs new file mode 100644 index 0000000..786f82b --- /dev/null +++ b/src/cli_args.rs @@ -0,0 +1,22 @@ +use std::path::PathBuf; +use clap::Parser; + +//------------------------------------------------------------------------------ + +/// MQTT to Statsd bridge. +#[derive(Parser, Debug)] +#[clap( name = "mqtt2statsd", version = clap::crate_version!())] +pub struct Args { + /// Log MQTT & statsd messages on the console + #[arg(short, long)] + pub verbose: bool, + + /// Configuration file + pub configuration: PathBuf +} + +//------------------------------------------------------------------------------ + +pub fn parse() -> Args { + Args::parse() +} diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000..a4487a5 --- /dev/null +++ b/src/config.rs @@ -0,0 +1,68 @@ +use std::error::Error; +use std::fs; +use std::path::PathBuf; + +use serde::Deserialize; + +// ============================================================================= +// Structs +// ============================================================================= + +#[derive(Debug, Deserialize)] +pub struct Config { + pub mqtt: MqttConfig, + pub statsd: StatsdConfig, + pub topics: Vec +} + +//------------------------------------------------------------------------------ + +#[derive(Debug, Deserialize)] +pub struct MqttConfig { + pub hostname: String, + + #[serde(default = "default_mqtt_port") ] + pub port: u32 +} + +//------------------------------------------------------------------------------ + + +#[derive(Debug, Deserialize)] +pub struct StatsdConfig { + pub hostname: String, + + #[serde(default = "default_statsd_port")] + pub port: u32 +} + +//------------------------------------------------------------------------------ + +#[derive(Debug, Deserialize)] +pub struct Topic { + pub mqtt_topic: String, + pub statsd_topic: String +} + +// ============================================================================= +// Public Functions +// ============================================================================= + +pub fn from_file( path: &PathBuf ) -> Result> { + let config_str = fs::read_to_string(path)?; + Ok( toml::from_str(&config_str)? ) +} + +// ============================================================================= +// Private Functions +// ============================================================================= + +fn default_mqtt_port() -> u32 { + 1883 +} + +//------------------------------------------------------------------------------ + +fn default_statsd_port() -> u32 { + 8125 +} diff --git a/src/main.rs b/src/main.rs index c6e46b7..501b4c4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,108 +1,18 @@ -use std::path::PathBuf; -use std::fs; +use std::error::Error; -use clap::Parser; -use serde::Deserialize; +mod cli_args; +mod config; +mod mqtt; -extern crate paho_mqtt as mqtt; - -// ============================================================================= -// CLI arguments -// ============================================================================= - -/// MQTT to Statsd bridge. -#[derive(Parser, Debug)] -#[clap( name = "mqtt2statsd", version = clap::crate_version!())] -struct Args { - /// Log MQTT & statsd messages on the console - #[arg(short, long)] - verbose: bool, - - /// Configuration file - configuration: PathBuf -} - -// ============================================================================= -// Configuration file structure -// ============================================================================= - -#[derive(Debug, Deserialize)] -struct Config { - mqtt: MqttConfig, - statsd: StatsdConfig, - topics: Vec -} - -//------------------------------------------------------------------------------ - -fn default_mqtt_port() -> u32 { 1883 } - -#[derive(Debug, Deserialize)] -struct MqttConfig { - hostname: String, - - #[serde(default = "default_mqtt_port") ] - port: u32 -} - -//------------------------------------------------------------------------------ - -fn default_statsd_port() -> u32 { 8125 } - -#[derive(Debug, Deserialize)] -struct StatsdConfig { - hostname: String, - - #[serde(default = "default_statsd_port")] - port: u32 -} - -//------------------------------------------------------------------------------ - -#[derive(Debug, Deserialize)] -struct Topic { - mqtt_topic: String, - statsd_topic: String -} - -// ============================================================================= -// Main code -// ============================================================================= - -fn main() { - let args = Args::parse(); - - let config_str = fs::read_to_string(args.configuration) - .expect("Failed to read configuration file"); - let config: Config = toml::from_str(&config_str) - .expect("Error in configuration file"); +fn main() -> Result<(), Box> { + let args = cli_args::parse(); + let config = config::from_file(&args.configuration)?; println!("{:#?}", config); //TODO: remove - let mqtt_opts = mqtt::CreateOptionsBuilder::new() - .server_uri(config.mqtt.hostname) - .finalize(); - - let mqtt = mqtt::Client::new(mqtt_opts) - .expect("Cannot create mqtt client"); - + let mqtt = mqtt::create( &config.mqtt )?; let rx = mqtt.start_consuming(); - - let conn_opts = mqtt::ConnectOptionsBuilder::new() - .keep_alive_interval(std::time::Duration::from_secs(20)) - .finalize(); - - mqtt.connect(conn_opts) - .expect( "Cannot connect" ); - - for topic in config.topics { - let e = mqtt.subscribe(topic.mqtt_topic.as_str(), 1); - if e.is_ok() { - println!( "Subscribed to {}", topic.mqtt_topic ); - } else if args.verbose { - println!( "Cannot subscribe to {} (error: {e:?})", topic.mqtt_topic ); - } - } + mqtt::subscribe( &mqtt, &config.topics, args.verbose ); for msg in rx.iter() { if let Some(msg) = msg { @@ -117,4 +27,6 @@ fn main() { println!("Disconnecting"); //mqtt.unsubscribe_many(topics) } + + Ok(()) } diff --git a/src/mqtt.rs b/src/mqtt.rs new file mode 100644 index 0000000..fcb3d89 --- /dev/null +++ b/src/mqtt.rs @@ -0,0 +1,38 @@ +extern crate paho_mqtt as paho; + +use crate::config::MqttConfig; +use crate::config::Topic; + +// ============================================================================= +// Public Functions +// ============================================================================= + +pub fn create( config: &MqttConfig ) -> Result { + let client_opts = paho::CreateOptionsBuilder::new() + .server_uri(&config.hostname) + .finalize(); + + let ret = paho::Client::new(client_opts)?; + + let conn_opts = paho::ConnectOptionsBuilder::new() + .keep_alive_interval(std::time::Duration::from_secs(20)) + .finalize(); + + ret.connect(conn_opts)?; + + Ok(ret) +} + +//------------------------------------------------------------------------------ + +pub fn subscribe( mqtt: &paho::Client, topics: &Vec, verbose: bool ) { + + for topic in topics { + let e = mqtt.subscribe(topic.mqtt_topic.as_str(), 1); + if e.is_ok() { + println!( "Subscribed to {}", topic.mqtt_topic ); + } else if verbose { + println!( "Cannot subscribe to {} (error: {e:?})", topic.mqtt_topic ); + } + } +}