path: root/src/main.rs
diff options
authorLibravatar Daniel López Azaña <daniloaz@gmail.com>2024-03-30 20:59:54 +0100
committerLibravatar Daniel López Azaña <daniloaz@gmail.com>2024-03-30 20:59:58 +0100
commit7af7a7626a8e83fe3f9c3b0d2ad7d2b32da41d45 (patch)
tree319b9976f6274f42e748d1b501229bb219584f66 /src/main.rs
parentc6745a22e79f0556a19b0d44a181fb9d8ed78f90 (diff)
WARNING: unstable commit.
🔧 Update .gitignore to exclude .vscode directory ✨ Add futures-util and url dependencies to Cargo.toml ♻️ Refactor error handling into separate error module in Rust project 💡 Add get_password method documentation in connection_details.rs ♻️ Replace String with Transport enum for forced_transport in connection_options.rs ✨ Implement WebSocket connection logic in ls_client.rs with async support ✨ Add ClientStatus, ConnectionType, and DisconnectionType enums to manage client states in ls_client.rs ✨ (main.rs): add Transport enum to LightstreamerClient imports for WebSocket support ♻️ (main.rs): refactor signal handling to use SharedState struct for clean shutdown ✨ (main.rs): implement AtomicBool for graceful disconnect handling 📝 (main.rs): update comments to reflect new signal handling logic ✨ (main.rs): set forced transport to WebSocket streaming in Lightstreamer client options ✨ (util.rs): create new util module with clean_message function for message sanitization
Diffstat (limited to 'src/main.rs')
1 files changed, 23 insertions, 190 deletions
diff --git a/src/main.rs b/src/main.rs
index 1c4b05a..87f9da4 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,6 +1,6 @@
use hyper::client;
use lightstreamer_client::item_update::ItemUpdate;
-use lightstreamer_client::ls_client::LightstreamerClient;
+use lightstreamer_client::ls_client::{LightstreamerClient, Transport};
use lightstreamer_client::subscription::{Snapshot, Subscription, SubscriptionMode};
use lightstreamer_client::subscription_listener::SubscriptionListener;
@@ -11,10 +11,16 @@ use serde_urlencoded;
use signal_hook::low_level::signal_name;
use signal_hook::{consts::SIGINT, consts::SIGTERM, iterator::Signals};
use std::error::Error;
+use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
+struct SharedState {
+ client: Arc<Mutex<LightstreamerClient>>,
+ should_disconnect: Arc<AtomicBool>,
/// Sets up a signal hook for SIGINT and SIGTERM.
/// Creates a signal hook for the specified signals and spawns a thread to handle them.
@@ -29,7 +35,7 @@ use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
/// The function panics if it fails to create the signal iterator.
-async fn setup_signal_hook(client: Arc<Mutex<LightstreamerClient>>) {
+async fn setup_signal_hook(shared_state: Arc<Mutex<SharedState>>) {
// Create a signal set of signals to be handled and a signal iterator to monitor them.
let signals = &[SIGINT, SIGTERM];
let mut signals_iterator = Signals::new(signals).expect("Failed to create signal iterator");
@@ -41,198 +47,18 @@ async fn setup_signal_hook(client: Arc<Mutex<LightstreamerClient>>) {
// Clean up and prepare to exit...
// ...
- let mut client = client.lock().await;
- client.disconnect();
- // Exit with 0 code to indicate orderly shutdown.
- std::process::exit(0);
- }
- });
-async fn establish_persistent_http_connection(
- session_id_shared: Arc<Mutex<String>>,
-) -> Result<(), reqwest::Error> {
- let client = Client::new();
- let params = [
- ("LS_adapter_set", "DEMO"),
- ("LS_cid", "mgQkwtwdysogQz2BJ4Ji%20kOj2Bg"),
- ];
- let request_url =
- "http://push.lightstreamer.com/lightstreamer/create_session.txt?LS_protocol=TLCP-2.0.0";
- let response = client.post(request_url).form(&params).send().await?;
- if response.status().is_success() {
- let mut stream = response.bytes_stream();
- while let Some(item) = stream.next().await {
- match item {
- Ok(bytes) => {
- let response_text = String::from_utf8(bytes.to_vec())
- .expect("Failed to convert bytes to string");
- if let Some(start) = response_text.find("CONOK,") {
- if let Some(end) = response_text.find(",50000,5000,*\r\n") {
- let session_id = &response_text[start + 6..end];
- println!("Session ID: {}", session_id);
- let mut session_id_lock = session_id_shared.lock().await;
- *session_id_lock = session_id.to_string();
- }
- } else {
- println!("New message: {}", response_text);
- }
- }
- Err(e) => println!("Error while receiving: {:?}", e),
- }
- }
- } else {
- println!("Response was not successful: {}", response.status());
- }
- Ok(())
-// Establish a persistent WebSocket connection and handle the session creation
-async fn establish_persistent_ws_connection(
- session_id_shared: Arc<Mutex<String>>,
-) -> Result<(), Box<dyn Error>> {
- let ws_url = "wss://push.lightstreamer.com/lightstreamer";
- let (ws_stream, _) = tokio_tungstenite::connect_async_with_config(
- tokio_tungstenite::tungstenite::protocol::handshake::client::Request::from((ws_url, [("Sec-WebSocket-Protocol", "your-subprotocol")].iter().cloned()))
- ).await.expect("Failed to connect");
- let (mut write, mut read) = ws_stream.split();
- // Session creation parameters
- let params = [
- ("LS_op2", "create_session"),
- ("LS_cid", "mgQkwtwdysogQz2BJ4Ji kOj2Bg"),
- ("LS_adapter_set", "DEMO"),
- ];
- let encoded_params = serde_urlencoded::to_string(&params)?;
- // Send the create session message
- write
- .send(Message::Text(format!("{}\n", encoded_params)))
- .await?;
- // Listen for messages from the server
- while let Some(message) = read.next().await {
- match message? {
- Message::Text(text) => {
- if text.starts_with("CONOK") {
- let session_info: Vec<&str> = text.split(",").collect();
- let session_id = session_info.get(1).unwrap_or(&"").to_string();
- *session_id_shared.lock().await = session_id.clone();
- println!("Session established with ID: {}", session_id);
- subscribe_to_channel_ws(session_id, write).await?;
- break; // Exit after successful subscription
- }
- }
- _ => {}
- }
- }
- Ok(())
-async fn subscribe_to_channel(session_id: String) -> Result<(), reqwest::Error> {
- let client = Client::new();
- //let subscribe_url = "http://push.lightstreamer.com/lightstreamer/bind_session.txt";
- //let params = [("LS_session", &session_id)];
- let subscribe_url =
- "http://push.lightstreamer.com/lightstreamer/control.txt?LS_protocol=TLCP-2.0.0";
- let params = [
- ("LS_session", &session_id),
- ("LS_op", &"add".to_string()),
- ("LS_subId", &"1".to_string()),
- ("LS_data_adapter", &"CHAT_ROOM".to_string()),
- ("LS_group", &"chat_room".to_string()),
- ("LS_schema", &"timestamp message".to_string()),
- ("LS_mode", &"DISTINCT".to_string()),
- ("LS_reqId", &"1".to_string()),
- ];
- let response = client.post(subscribe_url).form(&params).send().await?;
- if response.status().is_success() {
- println!("Subscription successful!");
- } else {
- println!("Subscription failed: {}", response.status());
- }
- Ok(())
-// Function to subscribe to a channel using WebSocket
-async fn subscribe_to_channel_ws(
- session_id: String,
- mut write: futures::stream::SplitSink<
- tokio_tungstenite::WebSocketStream<
- tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
- >,
- tokio_tungstenite::tungstenite::protocol::Message,
- >,
-) -> Result<(), Box<dyn Error>> {
- // Example subscription to ITEM1 in MERGE mode from the DEMO adapter set
- let sub_params = [
- ("LS_table", "1"),
- ("LS_op2", "add"),
- ("LS_session", &session_id),
- ("LS_id", "item1"),
- ("LS_schema", "stock_name last_price"),
- ("LS_mode", "MERGE"),
- ];
- let encoded_sub_params = serde_urlencoded::to_string(&sub_params)?;
- // Send the subscription message
- write.send(Message::Text(encoded_sub_params)).await?;
- println!("Subscribed to channel with session ID: {}", session_id);
- Ok(())
-async fn main() -> Result<(), Box<dyn Error>> {
- let session_id_shared = Arc::new(Mutex::new(String::new()));
- let session_id_shared_clone = session_id_shared.clone();
- let task1 = tokio::spawn(async move {
- establish_persistent_http_connection(session_id_shared_clone).await.unwrap();
- });
- println!("Established connection to Lightstreamer server");
- let task2 = tokio::spawn(async move {
- let mut session_established = false;
- loop {
- tokio::time::sleep(std::time::Duration::from_secs(1)).await;
- let session_id;
- session_id = session_id_shared.lock().await.clone();
+ let shared_state = shared_state.lock().await;
+ shared_state.should_disconnect.store(true, Ordering::Relaxed);
+ let mut client = shared_state.client.lock().await;
+ client.disconnect();
- if !session_established && !session_id.is_empty() {
- println!("Accessed Session ID from another thread: {}", session_id);
- session_established = true;
- subscribe_to_channel(session_id).await.unwrap();
- }
+ // Exit with 0 code to indicate orderly shutdown.
+ std::process::exit(0);
- task1.await?;
- task2.await?;
- Ok(())
pub struct MySubscriptionListener {}
@@ -277,10 +103,17 @@ async fn main() -> Result<(), Box<dyn Error>> {
let mut client = client.lock().await;
+ client.connection_options.set_forced_transport(Some(Transport::WsStreaming));
+ let should_disconnect = Arc::new(AtomicBool::new(false));
+ let shared_state = Arc::new(Mutex::new(SharedState {
+ client: client.clone(),
+ should_disconnect: should_disconnect.clone(),
+ }));
// Spawn a new thread to handle SIGINT and SIGTERM process signals.
- setup_signal_hook(client.clone()).await;
+ setup_signal_hook(shared_state).await;
// Infinite loop that will indefinitely retry failed connections unless
@@ -290,7 +123,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
let mut retry_counter: u64 = 0;
loop {
let mut client = client.lock().await;
- match client.connect() {
+ match client.connect().await {
Ok(_) => {}
Err(e) => {
println!("Failed to connect: {:?}", e);