aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLibravatar daniloaz <daniloaz@gmail.com>2024-04-13 21:24:12 +0200
committerLibravatar daniloaz <daniloaz@gmail.com>2024-04-13 21:24:12 +0200
commit65282048aefecda06f937ed670755b64b19ce9b1 (patch)
tree1a026ad96955820ab72b02dba79c3c3b8662cacd
parent5af8a69942aeed85b5cd2bd84cb5f33169834690 (diff)
⬆️ (Cargo.toml): Bump lightstreamer-client version from 0.1.8 to 0.1.9
✨ (Cargo.toml): Add colored dependency to enhance console output 📝 (README.md): Overhaul documentation to provide comprehensive details about the project, its features, usage, and contribution guidelines 💡 (client_listener.rs, client_message_listener.rs): Add newline at end of file to adhere to POSIX standards ♻️ (connection_options.rs): Refactor code to improve readability and maintainability 🐛 (connection_options.rs): Rename 'reduce_head' to '_reduce_head' to indicate unused variable ♻️ (error.rs): Reorder imports and adjust formatting for consistency ♻️ (item_update.rs): Refactor code to improve readability and maintainability ♻️ (lib.rs): Reorder module imports for better organization ♻️ (ls_client.rs): Refactor code to improve readability and maintainability 💡 (main.rs): Update print statements to use colored output for changed fields 🐛 (main.rs): Fix import order to follow Rust's idiomatic style 🐛 (proxy.rs, subscription.rs, subscription_listener.rs): Add newline at end of file to follow POSIX standard ♻️ (subscription.rs, subscription_listener.rs): Refactor code to improve readability and maintainability 📝 (util.rs): Add newline at end of file to adhere to POSIX standards
-rw-r--r--Cargo.toml3
-rw-r--r--README.md77
-rw-r--r--src/client_listener.rs2
-rw-r--r--src/client_message_listener.rs2
-rw-r--r--src/connection_options.rs80
-rw-r--r--src/error.rs12
-rw-r--r--src/item_update.rs14
-rw-r--r--src/lib.rs6
-rw-r--r--src/ls_client.rs5
-rw-r--r--src/main.rs43
-rw-r--r--src/proxy.rs2
-rw-r--r--src/subscription.rs73
-rw-r--r--src/subscription_listener.rs24
-rw-r--r--src/util.rs2
14 files changed, 244 insertions, 101 deletions
diff --git a/Cargo.toml b/Cargo.toml
index f11f715..8fbc8a8 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "lightstreamer-client"
-version = "0.1.8"
+version = "0.1.9"
edition = "2021"
authors = ["Daniel López Azaña <daniloaz@gmail.com>"]
description = "A Rust client for Lightstreamer, designed to facilitate real-time communication with Lightstreamer servers."
@@ -11,6 +11,7 @@ documentation = "https://github.com/daniloaz/lightstreamer-client#readme"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
+colored = "2"
cookie = { version = "0", features = ["percent-encode"]}
futures = "0"
futures-util = "0"
diff --git a/README.md b/README.md
index 793c45e..ec149c4 100644
--- a/README.md
+++ b/README.md
@@ -1,2 +1,75 @@
-# lightstreamer-client
-Lightstreamer client for Rust
+# Lightstreamer Rust Client SDK
+
+This project is a partial implementation of the Lightstreamer TLCP (Text-based Live Connections Protocol) in Rust. It provides a client SDK to interact with Lightstreamer servers, focused on supporting the specific needs of the [ig_trading_api](https://github.com/daniloaz/ig_trading_api) project.
+
+## Features
+
+- Full-duplex WebSocket-based connection mode.
+- Subscriptions to items and item groups.
+- MERGE subscription mode.
+- Listening to connection events and messages.
+- Configuration of connection options and connection details.
+- Subscription lifecycle management.
+- Retrieval of real-time item updates.
+
+Please note that this SDK currently does not support all the features and capabilities of the full Lightstreamer protocol. It has been developed to cover the requirements of the ig_trading_api project mentioned above. Features like other connection modes, subscription modes (DISTINCT, RAW, COMMAND), and some other advanced options are not implemented at this time.
+
+## Installation
+
+To use this SDK in your Rust project, add the following dependency to your `Cargo.toml`:
+
+```toml
+[dependencies]
+lightstreamer-client = "0.1.9"
+```
+
+## Usage
+
+Here's a minimal example of how to use the Lightstreamer Rust Client SDK:
+
+```rust
+use lightstreamer_client::ls_client::LightstreamerClient;
+use lightstreamer_client::subscription::{Subscription, SubscriptionMode};
+
+#[tokio::main]
+async fn main() {
+ // Create a Lightstreamer client
+ let client = LightstreamerClient::new(
+ Some("http://push.lightstreamer.com/lightstreamer"), // Lightstreamer server
+ Some("DEMO"), // adapter set
+ None, // username
+ None, // password
+ ).unwrap();
+
+ // Create a subscription
+ let mut subscription = Subscription::new(
+ SubscriptionMode::Merge,
+ Some(vec!["item1".to_string(), "item2".to_string()]),
+ Some(vec!["field1".to_string(), "field2".to_string()]),
+ ).unwrap();
+
+ // Subscribe and connect
+ client.subscribe(subscription);
+ client.connect(None).await.unwrap();
+}
+```
+
+For a more advanced example of how to use the SDK to subscribe to item updates, refer to the main.rs file in the project source code. It demonstrates creating a Lightstreamer client, setting up subscriptions, handling item updates, and managing the connection lifecycle with a configurable number of connection attempts.
+
+For more details on using the SDK, please refer to the reference documentation.
+
+## Documentation
+
+The full SDK documentation is available at [docs.rs](https://docs.rs/lightstreamer-client).
+
+## Project Structure
+
+Although this SDK does not provide a complete implementation of the Lightstreamer protocol, it has been built with a solid structure and scaffolding, similar to the official Lightstreamer libraries. The code is documented, and the project is designed to facilitate contributions from the community to add support for missing features.
+
+## License
+
+This project is licensed under the GPL-3.0 License. See the [LICENSE](LICENSE) file for details.
+
+## Contributing
+
+Contributions are welcome. Please open an issue or submit a pull request to propose changes and help complete the SDK with additional Lightstreamer features. \ No newline at end of file
diff --git a/src/client_listener.rs b/src/client_listener.rs
index cb2a73f..e6e2fee 100644
--- a/src/client_listener.rs
+++ b/src/client_listener.rs
@@ -185,4 +185,4 @@ pub trait ClientListener: Debug + Send {
// Implementation for on_status_change
unimplemented!("Implement on_status_change method for ClientListener");
}
-} \ No newline at end of file
+}
diff --git a/src/client_message_listener.rs b/src/client_message_listener.rs
index e6f5fc0..1f77027 100644
--- a/src/client_message_listener.rs
+++ b/src/client_message_listener.rs
@@ -71,4 +71,4 @@ pub trait ClientMessageListener {
// Implementation for on_processed
unimplemented!("Implement on_processed method for ClientMessageListener.");
}
-} \ No newline at end of file
+}
diff --git a/src/connection_options.rs b/src/connection_options.rs
index e9b3ab0..3c3485f 100644
--- a/src/connection_options.rs
+++ b/src/connection_options.rs
@@ -30,7 +30,7 @@ pub struct ConnectionOptions {
slowing_enabled: bool,
stalled_timeout: u64,
send_sync: bool,
- reduce_head: bool,
+ _reduce_head: bool,
supported_diffs: Option<String>,
polling: bool,
ttl_millis: Option<u64>,
@@ -59,7 +59,7 @@ impl ConnectionOptions {
stalled_timeout: 2000,
server_instance_address_ignored: false,
send_sync: true,
- reduce_head: false,
+ _reduce_head: false,
supported_diffs: None,
polling: false,
ttl_millis: None,
@@ -334,7 +334,10 @@ impl ConnectionOptions {
/// # Raises
///
/// * `IllegalArgumentException`: if a negative or zero value is configured
- pub fn set_content_length(&mut self, content_length: u64) -> Result<(), IllegalArgumentException> {
+ pub fn set_content_length(
+ &mut self,
+ content_length: u64,
+ ) -> Result<(), IllegalArgumentException> {
if content_length == 0 {
return Err(IllegalArgumentException::new(
"Content length cannot be zero",
@@ -369,7 +372,10 @@ impl ConnectionOptions {
/// # Raises
///
/// * `IllegalArgumentException`: if a negative or zero value is configured
- pub fn set_first_retry_max_delay(&mut self, first_retry_max_delay: u64) -> Result<(), IllegalArgumentException> {
+ pub fn set_first_retry_max_delay(
+ &mut self,
+ first_retry_max_delay: u64,
+ ) -> Result<(), IllegalArgumentException> {
if first_retry_max_delay == 0 {
return Err(IllegalArgumentException::new(
"First retry max delay cannot be zero",
@@ -487,7 +493,8 @@ impl ConnectionOptions {
&mut self,
http_extra_headers_on_session_creation_only: bool,
) {
- self.http_extra_headers_on_session_creation_only = http_extra_headers_on_session_creation_only;
+ self.http_extra_headers_on_session_creation_only =
+ http_extra_headers_on_session_creation_only;
}
/// Setter method that sets the maximum time the Server is allowed to wait for any data to
@@ -521,9 +528,7 @@ impl ConnectionOptions {
/// * `IllegalArgumentException`: if a negative value is configured
pub fn set_idle_timeout(&mut self, idle_timeout: u64) -> Result<(), IllegalArgumentException> {
if idle_timeout == 0 {
- return Err(IllegalArgumentException::new(
- "Idle timeout cannot be zero",
- ));
+ return Err(IllegalArgumentException::new("Idle timeout cannot be zero"));
}
self.idle_timeout = idle_timeout;
@@ -558,13 +563,17 @@ impl ConnectionOptions {
/// See also `setStalledTimeout()`
///
/// See also `setReconnectTimeout()`
- pub fn set_keepalive_interval(&mut self, keepalive_interval: u64) -> Result<(), IllegalArgumentException> {
+ pub fn set_keepalive_interval(
+ &mut self,
+ keepalive_interval: u64,
+ ) -> Result<(), IllegalArgumentException> {
if keepalive_interval == 0 {
self.keepalive_interval = keepalive_interval;
return Ok(());
}
- if keepalive_interval < self.stalled_timeout || keepalive_interval < self.reconnect_timeout {
+ if keepalive_interval < self.stalled_timeout || keepalive_interval < self.reconnect_timeout
+ {
return Err(IllegalArgumentException::new(
"Keepalive interval should be greater than or equal to stalled timeout and reconnect timeout",
));
@@ -616,7 +625,10 @@ impl ConnectionOptions {
/// # Raises
///
/// * `IllegalArgumentException`: if a negative value is configured
- pub fn set_polling_interval(&mut self, polling_interval: u64) -> Result<(), IllegalArgumentException> {
+ pub fn set_polling_interval(
+ &mut self,
+ polling_interval: u64,
+ ) -> Result<(), IllegalArgumentException> {
if polling_interval == 0 {
self.polling_interval = polling_interval;
return Ok(());
@@ -675,7 +687,10 @@ impl ConnectionOptions {
/// See also `setStalledTimeout()`
///
/// See also `setKeepaliveInterval()`
- pub fn set_reconnect_timeout(&mut self, reconnect_timeout: u64) -> Result<(), IllegalArgumentException> {
+ pub fn set_reconnect_timeout(
+ &mut self,
+ reconnect_timeout: u64,
+ ) -> Result<(), IllegalArgumentException> {
if reconnect_timeout == 0 {
return Err(IllegalArgumentException::new(
"Reconnect timeout cannot be zero",
@@ -719,7 +734,10 @@ impl ConnectionOptions {
/// values) is passed.
///
/// See also `get_real_max_bandwidth()`
- pub fn set_requested_max_bandwidth(&mut self, max_bandwidth: Option<f64>) -> Result<(), IllegalArgumentException> {
+ pub fn set_requested_max_bandwidth(
+ &mut self,
+ max_bandwidth: Option<f64>,
+ ) -> Result<(), IllegalArgumentException> {
if let Some(bandwidth) = max_bandwidth {
if bandwidth <= 0.0 {
return Err(IllegalArgumentException::new(
@@ -781,9 +799,7 @@ impl ConnectionOptions {
/// See also `setFirstRetryMaxDelay()`
pub fn set_retry_delay(&mut self, retry_delay: u64) -> Result<(), IllegalArgumentException> {
if retry_delay == 0 {
- return Err(IllegalArgumentException::new(
- "Retry delay cannot be zero",
- ));
+ return Err(IllegalArgumentException::new("Retry delay cannot be zero"));
}
self.retry_delay = retry_delay;
@@ -836,7 +852,10 @@ impl ConnectionOptions {
/// # Raises
///
/// * `IllegalArgumentException`: if a negative value is configured
- pub fn set_reverse_heartbeat_interval(&mut self, reverse_heartbeat_interval: u64) -> Result<(), IllegalArgumentException> {
+ pub fn set_reverse_heartbeat_interval(
+ &mut self,
+ reverse_heartbeat_interval: u64,
+ ) -> Result<(), IllegalArgumentException> {
if reverse_heartbeat_interval == 0 {
self.reverse_heartbeat_interval = reverse_heartbeat_interval;
return Ok(());
@@ -919,7 +938,10 @@ impl ConnectionOptions {
/// # Raises
///
/// * `IllegalArgumentException`: if a negative value is passed.
- pub fn set_session_recovery_timeout(&mut self, session_recovery_timeout: u64) -> Result<(), IllegalArgumentException> {
+ pub fn set_session_recovery_timeout(
+ &mut self,
+ session_recovery_timeout: u64,
+ ) -> Result<(), IllegalArgumentException> {
if session_recovery_timeout == 0 {
self.session_recovery_timeout = session_recovery_timeout;
return Ok(());
@@ -987,7 +1009,10 @@ impl ConnectionOptions {
/// See also `setReconnectTimeout()`
///
/// See also `setKeepaliveInterval()`
- pub fn set_stalled_timeout(&mut self, stalled_timeout: u64) -> Result<(), IllegalArgumentException> {
+ pub fn set_stalled_timeout(
+ &mut self,
+ stalled_timeout: u64,
+ ) -> Result<(), IllegalArgumentException> {
if stalled_timeout == 0 {
return Err(IllegalArgumentException::new(
"Stalled timeout cannot be zero",
@@ -1101,7 +1126,10 @@ impl Debug for ConnectionOptions {
.field("first_retry_max_delay", &self.first_retry_max_delay)
.field("forced_transport", &self.forced_transport)
.field("http_extra_headers", &self.http_extra_headers)
- .field("http_extra_headers_on_session_creation_only", &self.http_extra_headers_on_session_creation_only)
+ .field(
+ "http_extra_headers_on_session_creation_only",
+ &self.http_extra_headers_on_session_creation_only,
+ )
.field("idle_timeout", &self.idle_timeout)
.field("keepalive_interval", &self.keepalive_interval)
.field("polling_interval", &self.polling_interval)
@@ -1110,8 +1138,14 @@ impl Debug for ConnectionOptions {
.field("reconnect_timeout", &self.reconnect_timeout)
.field("requested_max_bandwidth", &self.requested_max_bandwidth)
.field("retry_delay", &self.retry_delay)
- .field("reverse_heartbeat_interval", &self.reverse_heartbeat_interval)
- .field("server_instance_address_ignored", &self.server_instance_address_ignored)
+ .field(
+ "reverse_heartbeat_interval",
+ &self.reverse_heartbeat_interval,
+ )
+ .field(
+ "server_instance_address_ignored",
+ &self.server_instance_address_ignored,
+ )
.field("session_recovery_timeout", &self.session_recovery_timeout)
.field("slowing_enabled", &self.slowing_enabled)
.field("stalled_timeout", &self.stalled_timeout)
@@ -1133,7 +1167,7 @@ impl Default for ConnectionOptions {
proxy: None,
real_max_bandwidth: None,
reconnect_timeout: 3000,
- reduce_head: false,
+ _reduce_head: false,
requested_max_bandwidth: None,
retry_delay: 4000,
reverse_heartbeat_interval: 0,
diff --git a/src/error.rs b/src/error.rs
index cd1d7c2..33c6ff8 100644
--- a/src/error.rs
+++ b/src/error.rs
@@ -1,5 +1,5 @@
-use std::fmt;
use std::error::Error;
+use std::fmt;
#[derive(Debug)]
pub struct IllegalArgumentException(String);
@@ -24,18 +24,20 @@ impl Error for IllegalArgumentException {
#[derive(Debug)]
pub struct IllegalStateException {
- details: String
+ details: String,
}
impl IllegalStateException {
pub fn new(msg: &str) -> IllegalStateException {
- IllegalStateException{details: msg.to_string()}
+ IllegalStateException {
+ details: msg.to_string(),
+ }
}
}
impl fmt::Display for IllegalStateException {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- write!(f,"{}",self.details)
+ write!(f, "{}", self.details)
}
}
@@ -43,4 +45,4 @@ impl Error for IllegalStateException {
fn description(&self) -> &str {
&self.details
}
-} \ No newline at end of file
+}
diff --git a/src/item_update.rs b/src/item_update.rs
index 7c43d93..fd95f90 100644
--- a/src/item_update.rs
+++ b/src/item_update.rs
@@ -133,7 +133,10 @@ impl ItemUpdate {
.iter()
.find(|(name, _)| self.get_field_position(name) == pos)
.and_then(|(_, value)| value.as_deref()),
- Err(_) => self.fields.get(field_name_or_pos).and_then(|v| v.as_deref()),
+ Err(_) => self
+ .fields
+ .get(field_name_or_pos)
+ .and_then(|v| v.as_deref()),
}
}
@@ -162,10 +165,7 @@ impl ItemUpdate {
/// # Returns
/// A JSON Patch structure representing the difference between the new value and the previous one,
/// or None if the difference in JSON Patch format is not available for any reason.
- pub fn get_value_as_json_patch_if_available(
- &self,
- _field_name_or_pos: &str,
- ) -> Option<String> {
+ pub fn get_value_as_json_patch_if_available(&self, _field_name_or_pos: &str) -> Option<String> {
// Implementation pending
None
}
@@ -232,10 +232,10 @@ impl ItemUpdate {
///
/// # Returns
/// The 1-based position of the field within the field list or field schema.
- fn get_field_position(&self, field_name: &str) -> usize {
+ fn get_field_position(&self, _field_name: &str) -> usize {
// Implementation pending
// This method should return the 1-based position of the field based on the field list or field schema
// If the field is not found, it should raise an IllegalArgumentException
unimplemented!()
}
-} \ No newline at end of file
+}
diff --git a/src/lib.rs b/src/lib.rs
index f42f13a..5200961 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,11 +1,11 @@
pub mod client_listener;
pub mod client_message_listener;
-pub mod error;
-pub mod item_update;
-pub mod subscription_listener;
pub mod connection_details;
pub mod connection_options;
+pub mod error;
+pub mod item_update;
pub mod ls_client;
pub mod proxy;
pub mod subscription;
+pub mod subscription_listener;
pub mod util;
diff --git a/src/ls_client.rs b/src/ls_client.rs
index b65885c..39d7c8b 100644
--- a/src/ls_client.rs
+++ b/src/ls_client.rs
@@ -315,7 +315,8 @@ impl LightstreamerClient {
let mut request_id: usize = 0;
let mut _session_id: Option<String> = None;
let mut subscription_id: usize = 0;
- let mut subscription_item_updates: HashMap<usize, HashMap<usize, ItemUpdate>> = HashMap::new();
+ let mut subscription_item_updates: HashMap<usize, HashMap<usize, ItemUpdate>> =
+ HashMap::new();
loop {
tokio::select! {
message = read_stream.next() => {
@@ -488,7 +489,7 @@ impl LightstreamerClient {
// is always a snapshot.
if let Some(item_updates) = subscription_item_updates.get(&(subscription_index)) {
if let Some(_) = item_updates.get(&(item_index)) {
- // Item update already exists in item_updates, so it's not a snapshot.
+ // Item update already exists in item_updates, so it's not a snapshot.
false
} else {
// Item update doesn't exist in item_updates, so the first update is always a snapshot.
diff --git a/src/main.rs b/src/main.rs
index c284605..2da215c 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -3,11 +3,12 @@ use lightstreamer_client::ls_client::{LightstreamerClient, Transport};
use lightstreamer_client::subscription::{Snapshot, Subscription, SubscriptionMode};
use lightstreamer_client::subscription_listener::SubscriptionListener;
+use colored::*;
use signal_hook::low_level::signal_name;
use signal_hook::{consts::SIGINT, consts::SIGTERM, iterator::Signals};
use std::error::Error;
use std::sync::Arc;
-use tokio::sync::{Notify, Mutex};
+use tokio::sync::{Mutex, Notify};
const MAX_CONNECTION_ATTEMPTS: u64 = 1;
@@ -44,34 +45,33 @@ pub struct MySubscriptionListener {}
impl SubscriptionListener for MySubscriptionListener {
fn on_item_update(&self, update: &ItemUpdate) {
- println!(
- "UPDATE for item '{}' => '{}': {}, '{}': {}, '{}': {}, '{}': {}, '{}': {}, '{}': {}, '{}': {}, '{}': {}, '{}': {}, '{}': {}, '{}': {}, '{}': {}",
- update.item_name.as_ref().unwrap_or(&"N/A".to_string()),
+ let not_available = "N/A".to_string();
+ let item_name = update.item_name.clone().unwrap_or(not_available.clone());
+ let fields = vec![
"stock_name",
- update.get_value("stock_name").unwrap_or(&"N/A".to_string()),
"last_price",
- update.get_value("last_price").unwrap_or(&"N/A".to_string()),
"time",
- update.get_value("time").unwrap_or(&"N/A".to_string()),
"pct_change",
- update.get_value("pct_change").unwrap_or(&"N/A".to_string()),
"bid_quantity",
- update.get_value("bid_quantity").unwrap_or(&"N/A".to_string()),
"bid",
- update.get_value("bid").unwrap_or(&"N/A".to_string()),
"ask",
- update.get_value("ask").unwrap_or(&"N/A".to_string()),
"ask_quantity",
- update.get_value("ask_quantity").unwrap_or(&"N/A".to_string()),
"min",
- update.get_value("min").unwrap_or(&"N/A".to_string()),
"max",
- update.get_value("max").unwrap_or(&"N/A".to_string()),
"ref_price",
- update.get_value("ref_price").unwrap_or(&"N/A".to_string()),
"open_price",
- update.get_value("open_price").unwrap_or(&"N/A".to_string()),
- );
+ ];
+ let mut output = String::new();
+ for field in fields {
+ let value = update.get_value(field).unwrap_or(&not_available).clone();
+ let value_str = if update.changed_fields.contains_key(field) {
+ value.yellow().to_string()
+ } else {
+ value.to_string()
+ };
+ output.push_str(&format!("{}: {}, ", field, value_str));
+ }
+ println!("{}, {}", item_name, output);
}
}
@@ -128,7 +128,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
{
let mut client = client.lock().await;
client.subscribe(my_subscription);
- client.connection_options.set_forced_transport(Some(Transport::WsStreaming));
+ client
+ .connection_options
+ .set_forced_transport(Some(Transport::WsStreaming));
}
// Create a new Notify instance to send a shutdown signal to the signal handler thread.
@@ -163,7 +165,10 @@ async fn main() -> Result<(), Box<dyn Error>> {
}
if retry_counter == MAX_CONNECTION_ATTEMPTS {
- println!("Failed to connect after {} retries. Exiting...", retry_counter);
+ println!(
+ "Failed to connect after {} retries. Exiting...",
+ retry_counter
+ );
} else {
println!("Exiting orderly from Lightstreamer client...");
}
diff --git a/src/proxy.rs b/src/proxy.rs
index 9a8add8..a420e67 100644
--- a/src/proxy.rs
+++ b/src/proxy.rs
@@ -80,4 +80,4 @@ pub enum ProxyType {
Socks4,
/// SOCKS5 proxy.
Socks5,
-} \ No newline at end of file
+}
diff --git a/src/subscription.rs b/src/subscription.rs
index 9252ee6..135996b 100644
--- a/src/subscription.rs
+++ b/src/subscription.rs
@@ -39,18 +39,6 @@ pub enum SubscriptionMode {
Command,
}
-impl SubscriptionMode {
- fn from_str(s: &str) -> Result<SubscriptionMode, String> {
- match s.to_lowercase().as_str() {
- "merge" => Ok(SubscriptionMode::Merge),
- "distinct" => Ok(SubscriptionMode::Distinct),
- "raw" => Ok(SubscriptionMode::Raw),
- "command" => Ok(SubscriptionMode::Command),
- _ => Err(format!("Invalid subscription mode: {}", s)),
- }
- }
-}
-
impl ToString for SubscriptionMode {
fn to_string(&self) -> String {
match self {
@@ -171,15 +159,15 @@ impl Subscription {
/// # See also
/// `addListener()`
pub fn remove_listener<T>(&mut self, listener: &T)
- where
- T: SubscriptionListener,
- {
- self.listeners.retain(|l| {
- let l_ref = l.as_ref() as &dyn SubscriptionListener;
- let listener_ref = listener as &dyn SubscriptionListener;
- !(std::ptr::addr_of!(*l_ref) == std::ptr::addr_of!(*listener_ref))
- });
- }
+ where
+ T: SubscriptionListener,
+ {
+ self.listeners.retain(|l| {
+ let l_ref = l.as_ref() as &dyn SubscriptionListener;
+ let listener_ref = listener as &dyn SubscriptionListener;
+ !(std::ptr::addr_of!(*l_ref) == std::ptr::addr_of!(*listener_ref))
+ });
+ }
/// Returns a list containing the SubscriptionListener instances that were added to this client.
///
@@ -404,7 +392,10 @@ impl Subscription {
///
/// # See also
/// `Subscription.setCommandSecondLevelFieldSchema()`
- pub fn set_command_second_level_data_adapter(&mut self, adapter: Option<String>) -> Result<(), String> {
+ pub fn set_command_second_level_data_adapter(
+ &mut self,
+ adapter: Option<String>,
+ ) -> Result<(), String> {
if self.is_active {
return Err("Subscription is active".to_string());
}
@@ -457,7 +448,10 @@ impl Subscription {
///
/// # See also
/// `Subscription.setCommandSecondLevelFields()`
- pub fn set_command_second_level_field_schema(&mut self, schema: Option<String>) -> Result<(), String> {
+ pub fn set_command_second_level_field_schema(
+ &mut self,
+ schema: Option<String>,
+ ) -> Result<(), String> {
if self.is_active {
return Err("Subscription is active".to_string());
}
@@ -511,7 +505,10 @@ impl Subscription {
///
/// # See also
/// `Subscription.setCommandSecondLevelFieldSchema()`
- pub fn set_command_second_level_fields(&mut self, fields: Option<Vec<String>>) -> Result<(), String> {
+ pub fn set_command_second_level_fields(
+ &mut self,
+ fields: Option<Vec<String>>,
+ ) -> Result<(), String> {
if self.is_active {
return Err("Subscription is active".to_string());
}
@@ -763,7 +760,12 @@ impl Subscription {
///
/// # Returns
/// The current value for the specified field of the specified key within the specified item (possibly `None`), or `None` if the specified key has not been added yet (note that it might have been added and eventually deleted).
- pub fn get_command_value(&self, item_pos: usize, key: &str, field_pos: usize) -> Option<&String> {
+ pub fn get_command_value(
+ &self,
+ item_pos: usize,
+ key: &str,
+ field_pos: usize,
+ ) -> Option<&String> {
let key = format!("{}_{}", item_pos, key);
self.command_values
.get(&key)
@@ -842,7 +844,9 @@ impl Subscription {
return None;
}
if let Some(ref schema) = self.field_schema {
- return schema.split(',').position(|field| field.trim() == "command");
+ return schema
+ .split(',')
+ .position(|field| field.trim() == "command");
}
None
}
@@ -896,9 +900,18 @@ impl Debug for Subscription {
.field("field_schema", &self.field_schema)
.field("fields", &self.fields)
.field("data_adapter", &self.data_adapter)
- .field("command_second_level_data_adapter", &self.command_second_level_data_adapter)
- .field("command_second_level_field_schema", &self.command_second_level_field_schema)
- .field("command_second_level_fields", &self.command_second_level_fields)
+ .field(
+ "command_second_level_data_adapter",
+ &self.command_second_level_data_adapter,
+ )
+ .field(
+ "command_second_level_field_schema",
+ &self.command_second_level_field_schema,
+ )
+ .field(
+ "command_second_level_fields",
+ &self.command_second_level_fields,
+ )
.field("requested_buffer_size", &self.requested_buffer_size)
.field("requested_max_frequency", &self.requested_max_frequency)
.field("requested_snapshot", &self.requested_snapshot)
@@ -907,4 +920,4 @@ impl Debug for Subscription {
.field("is_subscribed", &self.is_subscribed)
.finish()
}
-} \ No newline at end of file
+}
diff --git a/src/subscription_listener.rs b/src/subscription_listener.rs
index 72958fe..d05a6a3 100644
--- a/src/subscription_listener.rs
+++ b/src/subscription_listener.rs
@@ -57,7 +57,9 @@ pub trait SubscriptionListener: Send {
/// - `Subscription::set_command_second_level_field_schema()`
fn on_command_second_level_item_lost_updates(&mut self, _lost_updates: u32, _key: &str) {
// Default implementation does nothing.
- unimplemented!("Implement on_command_second_level_item_lost_updates method for SubscriptionListener.");
+ unimplemented!(
+ "Implement on_command_second_level_item_lost_updates method for SubscriptionListener."
+ );
}
/// Event handler that is called when the Server notifies an error on a second-level subscription.
@@ -89,9 +91,16 @@ pub trait SubscriptionListener: Send {
/// - `ConnectionDetails::set_adapter_set()`
/// - `Subscription::set_command_second_level_fields()`
/// - `Subscription::set_command_second_level_field_schema()`
- fn on_command_second_level_subscription_error(&mut self, _code: i32, _message: Option<&str>, _key: &str) {
+ fn on_command_second_level_subscription_error(
+ &mut self,
+ _code: i32,
+ _message: Option<&str>,
+ _key: &str,
+ ) {
// Default implementation does nothing.
- unimplemented!("Implement on_command_second_level_subscription_error method for SubscriptionListener.");
+ unimplemented!(
+ "Implement on_command_second_level_subscription_error method for SubscriptionListener."
+ );
}
/// Event handler that is called by Lightstreamer to notify that all snapshot events for an item
@@ -143,7 +152,12 @@ pub trait SubscriptionListener: Send {
/// # See also
///
/// - `Subscription::set_requested_max_frequency()`
- fn on_item_lost_updates(&mut self, _item_name: Option<&str>, _item_pos: usize, _lost_updates: u32) {
+ fn on_item_lost_updates(
+ &mut self,
+ _item_name: Option<&str>,
+ _item_pos: usize,
+ _lost_updates: u32,
+ ) {
// Default implementation does nothing.
unimplemented!("Implement on_item_lost_updates method for SubscriptionListener.");
}
@@ -273,4 +287,4 @@ pub trait SubscriptionListener: Send {
fn on_unsubscription(&mut self) {
// Default implementation does nothing.
}
-} \ No newline at end of file
+}
diff --git a/src/util.rs b/src/util.rs
index 8de27e3..60b2967 100644
--- a/src/util.rs
+++ b/src/util.rs
@@ -1,4 +1,4 @@
/// Clean the message from newlines and carriage returns and convert it to lowercase.
pub fn clean_message(text: &str) -> String {
text.replace("\n", "").replace("\r", "").to_lowercase()
-} \ No newline at end of file
+}