diff options
Diffstat (limited to 'src/main.rs')
-rw-r--r-- | src/main.rs | 51 |
1 files changed, 22 insertions, 29 deletions
diff --git a/src/main.rs b/src/main.rs index 87f9da4..8431333 100644 --- a/src/main.rs +++ b/src/main.rs @@ -13,14 +13,9 @@ use signal_hook::{consts::SIGINT, consts::SIGTERM, iterator::Signals}; use std::error::Error; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use tokio::sync::Mutex; +use tokio::sync::{Notify, Mutex}; use tokio_tungstenite::{connect_async, tungstenite::protocol::Message}; -struct SharedState { - client: Arc<Mutex<LightstreamerClient>>, - should_disconnect: Arc<AtomicBool>, -} - /// Sets up a signal hook for SIGINT and SIGTERM. /// /// Creates a signal hook for the specified signals and spawns a thread to handle them. @@ -35,7 +30,7 @@ struct SharedState { /// /// The function panics if it fails to create the signal iterator. /// -async fn setup_signal_hook(shared_state: Arc<Mutex<SharedState>>) { +async fn setup_signal_hook(shutdown_signal: Arc<Notify>) { // Create a signal set of signals to be handled and a signal iterator to monitor them. let signals = &[SIGINT, SIGTERM]; let mut signals_iterator = Signals::new(signals).expect("Failed to create signal iterator"); @@ -44,18 +39,8 @@ async fn setup_signal_hook(shared_state: Arc<Mutex<SharedState>>) { tokio::spawn(async move { for signal in signals_iterator.forever() { println!("Received signal: {}", signal_name(signal).unwrap()); - // - // Clean up and prepare to exit... - // ... - { - let shared_state = shared_state.lock().await; - shared_state.should_disconnect.store(true, Ordering::Relaxed); - let mut client = shared_state.client.lock().await; - client.disconnect(); - } - - // Exit with 0 code to indicate orderly shutdown. - std::process::exit(0); + let _ = shutdown_signal.notify_one(); + break; } }); } @@ -106,14 +91,10 @@ async fn main() -> Result<(), Box<dyn Error>> { client.connection_options.set_forced_transport(Some(Transport::WsStreaming)); } - let should_disconnect = Arc::new(AtomicBool::new(false)); - let shared_state = Arc::new(Mutex::new(SharedState { - client: client.clone(), - should_disconnect: should_disconnect.clone(), - })); - + // Create a new Notify instance to send a shutdown signal to the signal handler thread. + let shutdown_signal = Arc::new(tokio::sync::Notify::new()); // Spawn a new thread to handle SIGINT and SIGTERM process signals. - setup_signal_hook(shared_state).await; + setup_signal_hook(Arc::clone(&shutdown_signal)).await; // // Infinite loop that will indefinitely retry failed connections unless @@ -121,10 +102,13 @@ async fn main() -> Result<(), Box<dyn Error>> { // let mut retry_interval_milis: u64 = 0; let mut retry_counter: u64 = 0; - loop { + while retry_counter < 5 { let mut client = client.lock().await; - match client.connect().await { - Ok(_) => {} + match client.connect(Arc::clone(&shutdown_signal)).await { + Ok(_) => { + client.disconnect().await; + break; + } Err(e) => { println!("Failed to connect: {:?}", e); tokio::time::sleep(std::time::Duration::from_millis(retry_interval_milis)).await; @@ -137,4 +121,13 @@ async fn main() -> Result<(), Box<dyn Error>> { } } } + + if retry_counter == 5 { + println!("Failed to connect after {} retries. Exiting...", retry_counter); + } else { + println!("Exiting orderly from Lightstreamer client..."); + } + + // Exit using std::process::exit() to avoid waiting for existing tokio tasks to complete. + std::process::exit(0); } |