aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLibravatar daniloaz <daniloaz@gmail.com>2024-04-13 12:21:12 +0200
committerLibravatar daniloaz <daniloaz@gmail.com>2024-04-13 12:21:12 +0200
commit5af8a69942aeed85b5cd2bd84cb5f33169834690 (patch)
treed3910ffc83037fd931d665112b076070f01b9295
parent82a9d2f0708c7f3754d5c40a60fa291bb320301b (diff)
✨ (Cargo.toml): Bump package version to 0.1.8 for new changes
♻️ (item_update.rs): Refactor ItemUpdate struct and related methods to store only non-null changed fields ♻️ (ls_client.rs): Refactor data update handling to store updates in a HashMap and call on_item_update for each listener 🐛 (ls_client.rs): Fix item index off-by-one error in data update handling 🐛 (main.rs): Update on_item_update implementation to handle new ItemUpdate structure ♻️ (subscription_listener.rs): Refactor on_item_update method to take a reference to ItemUpdate
-rw-r--r--Cargo.toml2
-rw-r--r--src/item_update.rs6
-rw-r--r--src/ls_client.rs65
-rw-r--r--src/main.rs31
-rw-r--r--src/subscription_listener.rs2
5 files changed, 66 insertions, 40 deletions
diff --git a/Cargo.toml b/Cargo.toml
index 3d0bae7..f11f715 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "lightstreamer-client"
-version = "0.1.7"
+version = "0.1.8"
edition = "2021"
authors = ["Daniel López Azaña <daniloaz@gmail.com>"]
description = "A Rust client for Lightstreamer, designed to facilitate real-time communication with Lightstreamer servers."
diff --git a/src/item_update.rs b/src/item_update.rs
index 6c21882..7c43d93 100644
--- a/src/item_update.rs
+++ b/src/item_update.rs
@@ -28,7 +28,7 @@ pub struct ItemUpdate {
pub item_name: Option<String>,
pub item_pos: usize,
pub fields: HashMap<String, Option<String>>,
- pub changed_fields: HashMap<String, Option<String>>,
+ pub changed_fields: HashMap<String, String>,
pub is_snapshot: bool,
}
@@ -46,7 +46,7 @@ impl ItemUpdate {
///
/// # Returns
/// A map containing the values for each field changed with the last server update.
- pub fn get_changed_fields(&self) -> HashMap<String, Option<String>> {
+ pub fn get_changed_fields(&self) -> HashMap<String, String> {
self.changed_fields.clone()
}
@@ -60,7 +60,7 @@ impl ItemUpdate {
///
/// # Returns
/// A map containing the values for each field changed with the last server update.
- pub fn get_changed_fields_by_position(&self) -> HashMap<usize, Option<String>> {
+ pub fn get_changed_fields_by_position(&self) -> HashMap<usize, String> {
self.changed_fields
.iter()
.map(|(name, value)| (self.get_field_position(name), value.clone()))
diff --git a/src/ls_client.rs b/src/ls_client.rs
index 64b4a4f..b65885c 100644
--- a/src/ls_client.rs
+++ b/src/ls_client.rs
@@ -315,7 +315,7 @@ impl LightstreamerClient {
let mut request_id: usize = 0;
let mut _session_id: Option<String> = None;
let mut subscription_id: usize = 0;
- let mut item_updates: Vec<Vec<ItemUpdate>> = Vec::new();
+ let mut subscription_item_updates: HashMap<usize, HashMap<usize, ItemUpdate>> = HashMap::new();
loop {
tokio::select! {
message = read_stream.next() => {
@@ -445,7 +445,7 @@ impl LightstreamerClient {
// Data updates from server.
//
"u" => {
- println!("Received data update from server: '{}'", clean_text);
+ //println!("Received data update from server: '{}'", clean_text);
// Parse arguments from the received message.
let arguments = clean_text.split(",").collect::<Vec<&str>>();
//
@@ -464,7 +464,7 @@ impl LightstreamerClient {
//
let item_index = arguments.get(2).unwrap_or(&"").parse::<usize>().unwrap_or(0);
let item = match subscription.get_items() {
- Some(items) => items.get(item_index),
+ Some(items) => items.get(item_index-1),
None => {
println!("No items found in subscription: {:?}", subscription);
continue;
@@ -486,13 +486,16 @@ impl LightstreamerClient {
} else {
// If item doesn't exist in item_updates yet, the first update
// is always a snapshot.
- if let Some(item_updates) = item_updates.get(subscription_index) {
- if let Some(_) = item_updates.get(item_index) {
+ if let Some(item_updates) = subscription_item_updates.get(&(subscription_index)) {
+ if let Some(_) = item_updates.get(&(item_index)) {
+ // Item update already exists in item_updates, so it's not a snapshot.
false
} else {
+ // Item update doesn't exist in item_updates, so the first update is always a snapshot.
true
}
} else {
+ // Item updates not found for subscription, so the first update is always a snapshot.
true
}
}
@@ -597,29 +600,36 @@ impl LightstreamerClient {
}
}
- //println!("Field values: {:?}", field_map);
-
- let changed_fields: HashMap<String, Option<String>> = field_map.iter()
- .map(|(k, v)| (k.clone(), v.clone()))
+ // Store only item_update's changed fields.
+ let changed_fields: HashMap<String, String> = field_map.iter()
+ .filter_map(|(k, v)| {
+ if let Some(v) = v {
+ Some((k.clone(), v.clone()))
+ } else {
+ None
+ }
+ })
.collect();
- //println!("Changed fields: {:?}", changed_fields);
//
// Take the proper item_update from item_updates and update it with changed fields.
// If the item_update doesn't exist yet, create a new one.
//
- match item_updates.get_mut(subscription_index) {
- Some(item_updates) => match item_updates.get_mut(item_index) {
+ let current_item_update: ItemUpdate;
+ match subscription_item_updates.get_mut(&(subscription_index)) {
+ Some(item_updates) => match item_updates.get_mut(&(item_index)) {
Some(item_update) => {
//
// Iterate changed_fields and update existing item_update.fields assigning the new values.
//
for (field_name, new_value) in &changed_fields {
if item_update.fields.contains_key(field_name) {
- item_update.fields.insert((*field_name).clone(), new_value.clone());
+ item_update.fields.insert((*field_name).clone(), Some(new_value.clone()));
}
}
item_update.changed_fields = changed_fields;
+ item_update.is_snapshot = is_snapshot;
+ current_item_update = item_update.clone();
},
None => {
// Create a new item_update and add it to item_updates.
@@ -630,7 +640,8 @@ impl LightstreamerClient {
changed_fields: changed_fields,
is_snapshot: is_snapshot,
};
- item_updates.push(item_update);
+ current_item_update = item_update.clone();
+ item_updates.insert(item_index, item_update);
}
},
None => {
@@ -642,28 +653,20 @@ impl LightstreamerClient {
changed_fields: changed_fields,
is_snapshot: is_snapshot,
};
- item_updates.push(vec![item_update]);
+ current_item_update = item_update.clone();
+ let mut item_updates = HashMap::new();
+ item_updates.insert(item_index, item_update);
+ subscription_item_updates.insert(subscription_index, item_updates);
}
};
- //println!("Item updates: {:?}", item_updates);
+ // Get mutable subscription listeners directly.
+ let subscription_listeners = subscription.get_listeners();
- println!("Item updates: {}", serde_json::to_string_pretty(&item_updates).unwrap());
- println!("\n\n");
-
- if item_updates.len() >= 3 {
- return Ok(());
- }
-
- /*
- if let Some(item_updates) = item_updates.get_mut(subscription_index) {
- if let Some(item_update) = item_updates.get_mut(item_index) {
- for (field_name, field_value) in changed_fields {
- item_update.set_field_value(field_name, field_value);
- }
- }
+ // Iterate subscription listeners and call on_item_update for each listener.
+ for listener in subscription_listeners {
+ listener.on_item_update(&current_item_update);
}
- */
}
//
// Connection confirmation from server.
diff --git a/src/main.rs b/src/main.rs
index 4959652..c284605 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -43,11 +43,34 @@ async fn setup_signal_hook(shutdown_signal: Arc<Notify>) {
pub struct MySubscriptionListener {}
impl SubscriptionListener for MySubscriptionListener {
- fn on_item_update(&mut self, update: ItemUpdate) {
+ fn on_item_update(&self, update: &ItemUpdate) {
println!(
- "UPDATE {} {}",
- update.get_value("stock_name").unwrap(),
- update.get_value("last_price").unwrap()
+ "UPDATE for item '{}' => '{}': {}, '{}': {}, '{}': {}, '{}': {}, '{}': {}, '{}': {}, '{}': {}, '{}': {}, '{}': {}, '{}': {}, '{}': {}, '{}': {}",
+ update.item_name.as_ref().unwrap_or(&"N/A".to_string()),
+ "stock_name",
+ update.get_value("stock_name").unwrap_or(&"N/A".to_string()),
+ "last_price",
+ update.get_value("last_price").unwrap_or(&"N/A".to_string()),
+ "time",
+ update.get_value("time").unwrap_or(&"N/A".to_string()),
+ "pct_change",
+ update.get_value("pct_change").unwrap_or(&"N/A".to_string()),
+ "bid_quantity",
+ update.get_value("bid_quantity").unwrap_or(&"N/A".to_string()),
+ "bid",
+ update.get_value("bid").unwrap_or(&"N/A".to_string()),
+ "ask",
+ update.get_value("ask").unwrap_or(&"N/A".to_string()),
+ "ask_quantity",
+ update.get_value("ask_quantity").unwrap_or(&"N/A".to_string()),
+ "min",
+ update.get_value("min").unwrap_or(&"N/A".to_string()),
+ "max",
+ update.get_value("max").unwrap_or(&"N/A".to_string()),
+ "ref_price",
+ update.get_value("ref_price").unwrap_or(&"N/A".to_string()),
+ "open_price",
+ update.get_value("open_price").unwrap_or(&"N/A".to_string()),
);
}
}
diff --git a/src/subscription_listener.rs b/src/subscription_listener.rs
index f37c703..72958fe 100644
--- a/src/subscription_listener.rs
+++ b/src/subscription_listener.rs
@@ -156,7 +156,7 @@ pub trait SubscriptionListener: Send {
/// - `update`: a value object containing the updated values for all the fields, together with
/// meta-information about the update itself and some helper methods that can be used to
/// iterate through all or new values.
- fn on_item_update(&mut self, _update: ItemUpdate) {
+ fn on_item_update(&self, _update: &ItemUpdate) {
// Default implementation does nothing.
unimplemented!("Implement on_item_update method for SubscriptionListener.");
}