diff options
author | 2024-04-12 20:38:43 +0200 | |
---|---|---|
committer | 2024-04-12 20:38:43 +0200 | |
commit | 82a9d2f0708c7f3754d5c40a60fa291bb320301b (patch) | |
tree | a36179f1eb02b7691f28ea544ccf25df646c9021 | |
parent | 7d7f380e3075be51198f0ad457cc766f0641d984 (diff) |
Implemented part of the item update logic.
-rw-r--r-- | Cargo.toml | 1 | ||||
-rw-r--r-- | src/item_update.rs | 501 | ||||
-rw-r--r-- | src/ls_client.rs | 226 | ||||
-rw-r--r-- | src/main.rs | 24 |
4 files changed, 403 insertions, 349 deletions
@@ -14,6 +14,7 @@ documentation = "https://github.com/daniloaz/lightstreamer-client#readme" cookie = { version = "0", features = ["percent-encode"]} futures = "0" futures-util = "0" +json-patch = "1" reqwest = { version = "0", features = ["json", "stream"] } serde = { version = "1", features = ["derive"] } serde_json = { version = "1" } diff --git a/src/item_update.rs b/src/item_update.rs index 631267b..6c21882 100644 --- a/src/item_update.rs +++ b/src/item_update.rs @@ -1,428 +1,241 @@ use std::collections::HashMap; -use serde_json::Value; + +use serde::Serialize; /// Contains all the information related to an update of the field values for an item. /// It reports all the new values of the fields. /// +/// COMMAND Subscription: /// If the involved Subscription is a COMMAND Subscription, then the values for the current update /// are meant as relative to the same key. /// -/// Moreover, if the involved Subscription has a two-level behavior enabled, then each update may -/// be associated with either a first-level or a second-level item. In this case, the reported fields -/// are always the union of the first-level and second-level fields, and each single update can only -/// change either the first-level or the second-level fields (but for the "command" field, which is -/// first-level and is always set to "UPDATE" upon a second-level update). When the two-level behavior -/// is enabled, in all methods where a field name has to be supplied, the following convention should -/// be followed: +/// Moreover, if the involved Subscription has a two-level behavior enabled, then each update may be +/// associated with either a first-level or a second-level item. In this case, the reported fields are +/// always the union of the first-level and second-level fields and each single update can only change +/// either the first-level or the second-level fields (but for the "command" field, which is first-level +/// and is always set to "UPDATE" upon a second-level update); note that the second-level field values +/// are always None until the first second-level update occurs). When the two-level behavior is enabled, +/// in all methods where a field name has to be supplied, the following convention should be followed: /// -/// - The field name can always be used, both for the first-level and the second-level fields. -/// In case of name conflict, the first-level field is meant. -/// - The field position can always be used; however, the field positions for the second-level fields -/// start at the highest position of the first-level field list + 1. If a field schema had been -/// specified for either first-level or second-level Subscriptions, then client-side knowledge of -/// the first-level schema length would be required. +/// - The field name can always be used, both for the first-level and the second-level fields. In case of +/// name conflict, the first-level field is meant. +/// - The field position can always be used; however, the field positions for the second-level fields start +/// at the highest position of the first-level field list + 1. If a field schema had been specified for +/// either first-level or second-level Subscriptions, then client-side knowledge of the first-level schema +/// length would be required. +#[derive(Debug, Clone, Serialize)] pub struct ItemUpdate { - changed_fields: HashMap<String, Value>, - fields: HashMap<String, Value>, - item_name: Option<String>, - item_pos: usize, - is_snapshot: bool, - prev_values: HashMap<String, Value>, + pub item_name: Option<String>, + pub item_pos: usize, + pub fields: HashMap<String, Option<String>>, + pub changed_fields: HashMap<String, Option<String>>, + pub is_snapshot: bool, } impl ItemUpdate { /// Returns a map containing the values for each field changed with the last server update. - /// The related field name is used as key for the values in the map. - /// - /// Note that if the Subscription mode of the involved Subscription is COMMAND, then changed - /// fields are meant as relative to the previous update for the same key. On such tables if a - /// DELETE command is received, all the fields, excluding the key field, will be present as - /// changed, with None value. All of this is also true on tables that have the two-level behavior - /// enabled, but in case of DELETE commands second-level fields will not be iterated. + /// The related field name is used as key for the values in the map. Note that if the Subscription + /// mode of the involved Subscription is COMMAND, then changed fields are meant as relative to the + /// previous update for the same key. On such tables if a DELETE command is received, all the fields, + /// excluding the key field, will be present as changed, with None value. All of this is also true on + /// tables that have the two-level behavior enabled, but in case of DELETE commands second-level fields + /// will not be iterated. /// - /// # Errors + /// # Raises + /// - `IllegalStateException` – if the Subscription was initialized using a field schema. /// - /// Returns an `IllegalStateException` if the Subscription was initialized using a field schema. - /// - /// # Examples - /// - /// ``` - /// use serde_json::json; - /// let item_update = ItemUpdate { - /// changed_fields: vec![("foo".to_string(), json!(42)), ("bar".to_string(), json!("baz"))] - /// .into_iter() - /// .collect(), - /// fields: vec![("foo".to_string(), json!(42)), ("bar".to_string(), json!("baz"))] - /// .into_iter() - /// .collect(), - /// item_name: Some("item1".to_string()), - /// item_pos: 1, - /// is_snapshot: false, - /// prev_values: HashMap::new(), - /// }; - /// let changed_fields = item_update.get_changed_fields(); - /// assert_eq!(changed_fields.len(), 2); - /// assert_eq!(changed_fields.get("foo"), Some(&json!(42))); - /// assert_eq!(changed_fields.get("bar"), Some(&json!("baz"))); - /// ``` - pub fn get_changed_fields(&self) -> &HashMap<String, Value> { - &self.changed_fields + /// # Returns + /// A map containing the values for each field changed with the last server update. + pub fn get_changed_fields(&self) -> HashMap<String, Option<String>> { + self.changed_fields.clone() } /// Returns a map containing the values for each field changed with the last server update. - /// The 1-based field position within the field schema or field list is used as key for the - /// values in the map. + /// The 1-based field position within the field schema or field list is used as key for the values in + /// the map. Note that if the Subscription mode of the involved Subscription is COMMAND, then changed + /// fields are meant as relative to the previous update for the same key. On such tables if a DELETE + /// command is received, all the fields, excluding the key field, will be present as changed, with None + /// value. All of this is also true on tables that have the two-level behavior enabled, but in case of + /// DELETE commands second-level fields will not be iterated. /// - /// Note that if the Subscription mode of the involved Subscription is COMMAND, then changed - /// fields are meant as relative to the previous update for the same key. On such tables if a - /// DELETE command is received, all the fields, excluding the key field, will be present as - /// changed, with None value. All of this is also true on tables that have the two-level behavior - /// enabled, but in case of DELETE commands second-level fields will not be iterated. - /// - /// # Examples - /// - /// ``` - /// use serde_json::json; - /// use std::collections::HashMap; - /// let mut changed_fields_by_pos = HashMap::new(); - /// changed_fields_by_pos.insert(1, json!(42)); - /// changed_fields_by_pos.insert(2, json!("baz")); - /// let item_update = ItemUpdate { - /// changed_fields: changed_fields_by_pos, - /// fields: HashMap::new(), - /// item_name: None, - /// item_pos: 0, - /// is_snapshot: false, - /// prev_values: HashMap::new(), - /// }; - /// let changed_fields = item_update.get_changed_fields_by_position(); - /// assert_eq!(changed_fields.len(), 2); - /// assert_eq!(changed_fields.get(&1), Some(&json!(42))); - /// assert_eq!(changed_fields.get(&2), Some(&json!("baz"))); - /// ``` - pub fn get_changed_fields_by_position(&self) -> HashMap<usize, Value> { - // Convert the changed_fields HashMap to a HashMap with usize keys - let changed_fields_by_pos: HashMap<usize, Value> = self - .changed_fields + /// # Returns + /// A map containing the values for each field changed with the last server update. + pub fn get_changed_fields_by_position(&self) -> HashMap<usize, Option<String>> { + self.changed_fields .iter() - .enumerate() - .map(|(i, (_k, v))| (i + 1, v.clone())) - .collect(); - changed_fields_by_pos + .map(|(name, value)| (self.get_field_position(name), value.clone())) + .collect() } /// Returns a map containing the values for each field in the Subscription. /// The related field name is used as key for the values in the map. /// - /// # Errors + /// # Raises + /// - `IllegalStateException` – if the Subscription was initialized using a field schema. /// - /// Returns an `IllegalStateException` if the Subscription was initialized using a field schema. - /// - /// # Examples - /// - /// ``` - /// use serde_json::json; - /// let item_update = ItemUpdate { - /// changed_fields: HashMap::new(), - /// fields: vec![("foo".to_string(), json!(42)), ("bar".to_string(), json!("baz"))] - /// .into_iter() - /// .collect(), - /// item_name: Some("item1".to_string()), - /// item_pos: 1, - /// is_snapshot: false, - /// prev_values: HashMap::new(), - /// }; - /// let fields = item_update.get_fields(); - /// assert_eq!(fields.len(), 2); - /// assert_eq!(fields.get("foo"), Some(&json!(42))); - /// assert_eq!(fields.get("bar"), Some(&json!("baz"))); - /// ``` - pub fn get_fields(&self) -> &HashMap<String, Value> { - &self.fields + /// # Returns + /// A map containing the values for each field in the Subscription. + pub fn get_fields(&self) -> HashMap<String, Option<String>> { + self.fields.clone() } /// Returns a map containing the values for each field in the Subscription. - /// The 1-based field position within the field schema or field list is used as key for the - /// values in the map. + /// The 1-based field position within the field schema or field list is used as key for the values in the map. /// - /// # Examples - /// - /// ``` - /// use serde_json::json; - /// use std::collections::HashMap; - /// let mut fields_by_pos = HashMap::new(); - /// fields_by_pos.insert(1, json!(42)); - /// fields_by_pos.insert(2, json!("baz")); - /// let item_update = ItemUpdate { - /// changed_fields: HashMap::new(), - /// fields: fields_by_pos, - /// item_name: None, - /// item_pos: 0, - /// is_snapshot: false, - /// prev_values: HashMap::new(), - /// }; - /// let fields = item_update.get_fields_by_position(); - /// assert_eq!(fields.len(), 2); - /// assert_eq!(fields.get(&1), Some(&json!(42))); - /// assert_eq!(fields.get(&2), Some(&json!("baz"))); - /// ``` - pub fn get_fields_by_position(&self) -> &HashMap<String, Value> { - &self.fields + /// # Returns + /// A map containing the values for each field in the Subscription. + pub fn get_fields_by_position(&self) -> HashMap<usize, Option<String>> { + self.fields + .iter() + .map(|(name, value)| (self.get_field_position(name), value.clone())) + .collect() } /// Inquiry method that retrieves the name of the item to which this update pertains. /// - /// The name will be `None` if the related Subscription was initialized using an "Item Group". + /// The name will be None if the related Subscription was initialized using an "Item Group". /// - /// # Examples - /// - /// ``` - /// let item_update = ItemUpdate { - /// changed_fields: HashMap::new(), - /// fields: HashMap::new(), - /// item_name: Some("item1".to_string()), - /// item_pos: 1, - /// is_snapshot: false, - /// prev_values: HashMap::new(), - /// }; - /// assert_eq!(item_update.get_item_name(), Some("item1".to_string())); - /// ``` - pub fn get_item_name(&self) -> Option<&String> { - self.item_name.as_ref() + /// # Returns + /// The name of the item to which this update pertains. + pub fn get_item_name(&self) -> Option<&str> { + self.item_name.as_deref() } /// Inquiry method that retrieves the position in the "Item List" or "Item Group" of the item /// to which this update pertains. /// - /// # Examples - /// - /// ``` - /// let item_update = ItemUpdate { - /// changed_fields: HashMap::new(), - /// fields: HashMap::new(), - /// item_name: None, - /// item_pos: 5, - /// is_snapshot: false, - /// prev_values: HashMap::new(), - /// }; - /// assert_eq!(item_update.get_item_pos(), 5); - /// ``` + /// # Returns + /// The 1-based position of the item to which this update pertains. pub fn get_item_pos(&self) -> usize { self.item_pos } - /// Inquiry method that gets the value for a specified field, as received from the Server with - /// the current or previous update. - /// - /// # Errors + /// Inquiry method that gets the value for a specified field, as received from the Server with the + /// current or previous update. /// - /// Returns an `IllegalArgumentException` if the specified field is not part of the Subscription. + /// # Raises + /// - `IllegalArgumentException` – if the specified field is not part of the Subscription. /// /// # Parameters - /// - /// - `field_name_or_pos`: The field name or the 1-based position of the field within the "Field - /// List" or "Field Schema". + /// - `field_name_or_pos` – The field name or the 1-based position of the field within the "Field List" or "Field Schema". /// /// # Returns - /// - /// The value of the specified field; it can be `None` in the following cases: - /// - A `None` value has been received from the Server, as `None` is a possible value for a field. - /// - No value has been received for the field yet. - /// - The item is subscribed to with the COMMAND mode and a DELETE command is received (only the - /// fields used to carry key and command information are valued). - /// - /// # Examples - /// - /// ``` - /// use serde_json::json; - /// let item_update = ItemUpdate { - /// changed_fields: vec![("foo".to_string(), json!(42)), ("bar".to_string(), json!("baz"))] - /// .into_iter() - /// .collect(), - /// fields: vec![("foo".to_string(), json!(42)), ("bar".to_string(), json!("baz"))] - /// .into_iter() - /// .collect(), - /// item_name: Some("item1".to_string()), - /// item_pos: 1, - /// is_snapshot: false, - /// prev_values: HashMap::new(), - /// }; - /// assert_eq!(item_update.get_value("foo"), Some(json!(42))); - /// assert_eq!(item_update.get_value("bar"), Some(json!("baz"))); - /// assert_eq!(item_update.get_value(1), Some(json!(42))); - /// assert_eq!(item_update.get_value(2), Some(json!("baz"))); - /// ``` - pub fn get_value(&self, field_name_or_pos: &str) -> Option<&Value> { - self.fields.get(field_name_or_pos) + /// The value of the specified field; it can be None in the following cases: + /// + /// - a None value has been received from the Server, as None is a possible value for a field; + /// - no value has been received for the field yet; + /// - the item is subscribed to with the COMMAND mode and a DELETE command is received (only the fields + /// used to carry key and command information are valued). + pub fn get_value(&self, field_name_or_pos: &str) -> Option<&str> { + match field_name_or_pos.parse::<usize>() { + Ok(pos) => self + .fields + .iter() + .find(|(name, _)| self.get_field_position(name) == pos) + .and_then(|(_, value)| value.as_deref()), + Err(_) => self.fields.get(field_name_or_pos).and_then(|v| v.as_deref()), + } } - /// Inquiry method that gets the difference between the new value and the previous one as a - /// JSON Patch structure, provided that the Server has used the JSON Patch format to send this - /// difference, as part of the "delta delivery" mechanism. This, in turn, requires that: - /// - /// - The Data Adapter has explicitly indicated JSON Patch as the privileged type of compression - /// for this field. - /// - Both the previous and new value are suitable for the JSON Patch computation (i.e. they are - /// valid JSON representations). - /// - The item was subscribed to in MERGE or DISTINCT mode (note that, in case of two-level - /// behavior, this holds for all fields related with second-level items, as these items are in - /// MERGE mode). - /// - Sending the JSON Patch difference has been evaluated by the Server as more efficient than - /// sending the full new value. + /// Inquiry method that gets the difference between the new value and the previous one as a JSON Patch structure, + /// provided that the Server has used the JSON Patch format to send this difference, as part of the "delta delivery" + /// mechanism. This, in turn, requires that: /// - /// Note that the last condition can be enforced by leveraging the Server's `<jsonpatch_min_length>` - /// configuration flag, so that the availability of the JSON Patch form would only depend on the - /// Client and the Data Adapter. + /// - the Data Adapter has explicitly indicated JSON Patch as the privileged type of compression for this field; + /// - both the previous and new value are suitable for the JSON Patch computation (i.e. they are valid JSON representations); + /// - the item was subscribed to in MERGE or DISTINCT mode (note that, in case of two-level behavior, this holds for all + /// fields related with second-level items, as these items are in MERGE mode); + /// - sending the JSON Patch difference has been evaluated by the Server as more efficient than sending the full new value. /// - /// When the above conditions are not met, the method just returns `None`; in this case, the - /// new value can only be determined through `ItemUpdate::get_value()`. For instance, this will - /// always be needed to get the first value received. + /// Note that the last condition can be enforced by leveraging the Server's <jsonpatch_min_length> configuration flag, + /// so that the availability of the JSON Patch form would only depend on the Client and the Data Adapter. /// - /// # Errors + /// When the above conditions are not met, the method just returns None; in this case, the new value can only be determined + /// through `ItemUpdate.get_value()`. For instance, this will always be needed to get the first value received. /// - /// Returns an `IllegalArgumentException` if the specified field is not part of the Subscription. + /// # Raises + /// - `IllegalArgumentException` – if the specified field is not part of the Subscription. /// /// # Parameters - /// - /// - `field_name_or_pos`: The field name or the 1-based position of the field within the "Field - /// List" or "Field Schema". + /// - `field_name_or_pos` – The field name or the 1-based position of the field within the "Field List" or "Field Schema". /// /// # Returns - /// /// A JSON Patch structure representing the difference between the new value and the previous one, - /// or `None` if the difference in JSON Patch format is not available for any reason. - /// - /// # Examples - /// - /// ``` - /// use serde_json::{json, Value}; - /// let mut item_update = ItemUpdate { - /// changed_fields: vec![("foo".to_string(), json!(42))] - /// .into_iter() - /// .collect(), - /// fields: vec![("foo".to_string(), json!(42))] - /// .into_iter() - /// .collect(), - /// item_name: Some("item1".to_string()), - /// item_pos: 1, - /// is_snapshot: false, - /// prev_values: vec![("foo".to_string(), json!(41))] - /// .into_iter() - /// .collect(), - /// }; - /// - /// // Assuming the Server sends a JSON Patch for the "foo" field - /// let json_patch: Value = json!([ - /// { "op": "replace", "path": "/foo", "value": 42 } - /// ]); - /// item_update.changed_fields.insert("foo".to_string(), json_patch.clone()); - /// - /// assert_eq!( - /// item_update.get_value_as_json_patch_if_available("foo"), - /// Some(&json_patch) - /// ); - /// ``` - pub fn get_value_as_json_patch_if_available(&self, field_name_or_pos: &str) -> Option<&Value> { - self.changed_fields.get(field_name_or_pos) + /// or None if the difference in JSON Patch format is not available for any reason. + pub fn get_value_as_json_patch_if_available( + &self, + _field_name_or_pos: &str, + ) -> Option<String> { + // Implementation pending + None } - /// Inquiry method that asks whether the current update belongs to the item snapshot (which carries - /// the current item state at the time of Subscription). Snapshot events are sent only if snapshot - /// information was requested for the items through `Subscription::set_requested_snapshot()` and - /// precede the real time events. Snapshot information take different forms in different subscription - /// modes and can be spanned across zero, one or several update events. In particular: - /// - /// - If the item is subscribed to with the RAW subscription mode, then no snapshot is sent by - /// the Server. - /// - If the item is subscribed to with the MERGE subscription mode, then the snapshot consists - /// of exactly one event, carrying the current value for all fields. - /// - If the item is subscribed to with the DISTINCT subscription mode, then the snapshot consists - /// of some of the most recent updates; these updates are as many as specified through - /// `Subscription::set_requested_snapshot()`, unless fewer are available. - /// - If the item is subscribed to with the COMMAND subscription mode, then the snapshot consists - /// of an "ADD" event for each key that is currently present. + /// Inquiry method that asks whether the current update belongs to the item snapshot (which carries the current item state + /// at the time of Subscription). Snapshot events are sent only if snapshot information was requested for the items through + /// `Subscription.set_requested_snapshot()` and precede the real time events. Snapshot information takes different forms in + /// different subscription modes and can be spanned across zero, one or several update events. In particular: + /// + /// - if the item is subscribed to with the RAW subscription mode, then no snapshot is sent by the Server; + /// - if the item is subscribed to with the MERGE subscription mode, then the snapshot consists of exactly one event, + /// carrying the current value for all fields; + /// - if the item is subscribed to with the DISTINCT subscription mode, then the snapshot consists of some of the most recent + /// updates; these updates are as many as specified through `Subscription.set_requested_snapshot()`, unless fewer are available; + /// - if the item is subscribed to with the COMMAND subscription mode, then the snapshot consists of an "ADD" event for each key + /// that is currently present. + /// + /// Note that, in case of two-level behavior, snapshot-related updates for both the first-level item (which is in COMMAND mode) + /// and any second-level items (which are in MERGE mode) are qualified with this flag. /// - /// Note that, in case of two-level behavior, snapshot-related updates for both the first-level - /// item (which is in COMMAND mode) and any second-level items (which are in MERGE mode) are - /// qualified with this flag. - /// - /// # Examples - /// - /// ``` - /// let item_update = ItemUpdate { - /// changed_fields: HashMap::new(), - /// fields: HashMap::new(), - /// item_name: None, - /// item_pos: 0, - /// is_snapshot: true, - /// prev_values: HashMap::new(), - /// }; - /// assert!(item_update.is_snapshot()); - /// ``` + /// # Returns + /// `true` if the current update event belongs to the item snapshot; `false` otherwise. pub fn is_snapshot(&self) -> bool { self.is_snapshot } - /// Inquiry method that asks whether the value for a field has changed after the reception of - /// the last update from the Server for an item. If the Subscription mode is COMMAND then the - /// change is meant as relative to the same key. + /// Inquiry method that asks whether the value for a field has changed after the reception of the last update from the Server + /// for an item. If the Subscription mode is COMMAND then the change is meant as relative to the same key. /// /// # Parameters - /// - /// - `field_name_or_pos`: The field name or the 1-based position of the field within the field - /// list or field schema. + /// - `field_name_or_pos` – The field name or the 1-based position of the field within the field list or field schema. /// /// # Returns - /// /// Unless the Subscription mode is COMMAND, the return value is `true` in the following cases: /// - /// - It is the first update for the item. - /// - The new field value is different than the previous field value received for the item. + /// - It is the first update for the item; + /// - the new field value is different than the previous field value received for the item. /// /// If the Subscription mode is COMMAND, the return value is `true` in the following cases: /// - /// - It is the first update for the involved key value (i.e. the event carries an "ADD" command). - /// - The new field value is different than the previous field value received for the item, - /// relative to the same key value (the event must carry an "UPDATE" command). - /// - The event carries a "DELETE" command (this applies to all fields other than the field used - /// to carry key information). + /// - it is the first update for the involved key value (i.e. the event carries an "ADD" command); + /// - the new field value is different than the previous field value received for the item, relative to the same key value + /// (the event must carry an "UPDATE" command); + /// - the event carries a "DELETE" command (this applies to all fields other than the field used to carry key information). /// /// In all other cases, the return value is `false`. /// - /// # Errors - /// - /// Returns an `IllegalArgumentException` if the specified field is not part of the Subscription. - /// - /// # Examples - /// - /// ``` - /// use serde_json::json; - /// let item_update = ItemUpdate { - /// changed_fields: vec![("foo".to_string(), json!(42))] - /// .into_iter() - /// .collect(), - /// fields: vec![("foo".to_string(), json!(42)), ("bar".to_string(), json!("baz"))] - /// .into_iter() - /// .collect(), - /// item_name: Some("item1".to_string()), - /// item_pos: 1, - /// is_snapshot: false, - /// prev_values: vec![("foo".to_string(), json!(41))] - /// .into_iter() - /// .collect(), - /// }; - /// assert!(item_update.is_value_changed("foo")); - /// assert!(!item_update.is_value_changed("bar")); - /// ``` + /// # Raises + /// - `IllegalArgumentException` – if the specified field is not part of the Subscription. pub fn is_value_changed(&self, field_name_or_pos: &str) -> bool { - if let Some(new_value) = self.fields.get(field_name_or_pos) { - if let Some(prev_value) = self.prev_values.get(field_name_or_pos) { - return new_value != prev_value; - } else { - // This is the first update for the item - return true; - } + match field_name_or_pos.parse::<usize>() { + Ok(pos) => self + .changed_fields + .iter() + .any(|(name, _)| self.get_field_position(name) == pos), + Err(_) => self.changed_fields.contains_key(field_name_or_pos), } - false + } + + /// Helper method to get the 1-based position of a field within the field list or field schema. + /// + /// # Parameters + /// - `field_name` – The name of the field. + /// + /// # Returns + /// The 1-based position of the field within the field list or field schema. + fn get_field_position(&self, field_name: &str) -> usize { + // Implementation pending + // This method should return the 1-based position of the field based on the field list or field schema + // If the field is not found, it should raise an IllegalArgumentException + unimplemented!() } }
\ No newline at end of file diff --git a/src/ls_client.rs b/src/ls_client.rs index a463216..64b4a4f 100644 --- a/src/ls_client.rs +++ b/src/ls_client.rs @@ -3,11 +3,13 @@ use crate::client_message_listener::ClientMessageListener; use crate::connection_details::ConnectionDetails; use crate::connection_options::ConnectionOptions; use crate::error::IllegalStateException; -use crate::subscription::Subscription; +use crate::item_update::ItemUpdate; +use crate::subscription::{Snapshot, Subscription, SubscriptionMode}; use crate::util::*; use cookie::Cookie; use futures_util::{SinkExt, StreamExt}; +use std::collections::HashMap; use std::error::Error; use std::fmt::{self, Debug, Formatter}; use std::sync::Arc; @@ -126,7 +128,7 @@ impl LightstreamerClient { // Constants for WebSocket connection. // pub const SEC_WEBSOCKET_KEY: &'static str = "PNDUibe9ex7PnsrLbt0N4w=="; - pub const SEC_WEBSOCKET_PROTOCOL: &'static str ="TLCP-2.4.0.lightstreamer.com"; + pub const SEC_WEBSOCKET_PROTOCOL: &'static str = "TLCP-2.4.0.lightstreamer.com"; pub const SEC_WEBSOCKET_VERSION: &'static str = "13"; pub const SEC_WEBSOCKET_UPGRADE: &'static str = "websocket"; @@ -313,6 +315,7 @@ impl LightstreamerClient { let mut request_id: usize = 0; let mut _session_id: Option<String> = None; let mut subscription_id: usize = 0; + let mut item_updates: Vec<Vec<ItemUpdate>> = Vec::new(); loop { tokio::select! { message = read_stream.next() => { @@ -443,8 +446,225 @@ impl LightstreamerClient { // "u" => { println!("Received data update from server: '{}'", clean_text); + // Parse arguments from the received message. + let arguments = clean_text.split(",").collect::<Vec<&str>>(); + // + // Extract the subscription from the first argument. + // + let subscription_index = arguments.get(1).unwrap_or(&"").parse::<usize>().unwrap_or(0); + let subscription = match self.get_subscriptions().get(subscription_index-1) { + Some(subscription) => subscription, + None => { + println!("Subscription not found for index: {}", subscription_index); + continue; + } + }; + // + // Extract the item from the second argument. + // + let item_index = arguments.get(2).unwrap_or(&"").parse::<usize>().unwrap_or(0); + let item = match subscription.get_items() { + Some(items) => items.get(item_index), + None => { + println!("No items found in subscription: {:?}", subscription); + continue; + } + }; + // + // Determine if the update is a snapshot or real-time update based on the subscription parameters. + // + let is_snapshot = match subscription.get_requested_snapshot() { + Some(ls_snapshot) => { + match ls_snapshot { + Snapshot::No => false, + Snapshot::Yes => { + match subscription.get_mode() { + SubscriptionMode::Merge => { + if arguments.len() == 4 && arguments[3] == "$" { + // EOS notification received + true + } else { + // If item doesn't exist in item_updates yet, the first update + // is always a snapshot. + if let Some(item_updates) = item_updates.get(subscription_index) { + if let Some(_) = item_updates.get(item_index) { + false + } else { + true + } + } else { + true + } + } + }, + SubscriptionMode::Distinct | SubscriptionMode::Command => { + if !subscription.is_subscribed() { + true + } else { + false + } + }, + _ => false, + } + }, + _ => false, + } + }, + None => false, + }; - }, + // Extract the field values from the third argument. + let field_values: Vec<&str> = arguments.get(3).unwrap_or(&"").split('|').collect(); + + // + // Get fields from subscription and create a HashMap of field names and values. + // + let subscription_fields = subscription.get_fields(); + let mut field_map: HashMap<String, Option<String>> = subscription_fields + .map(|fields| fields.iter().map(|field_name| (field_name.to_string(), None)).collect()) + .unwrap_or_default(); + + let mut field_index = 0; + for value in field_values { + match value { + "" => { + // An empty value means the field is unchanged compared to the previous update of the same field. + if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) { + field_map.insert(field_name.to_string(), None); + } + field_index += 1; + } + "#" | "$" => { + // A value corresponding to a hash sign "#" or dollar sign "$" means the field is null or empty. + if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) { + field_map.insert(field_name.to_string(), Some("".to_string())); + } + field_index += 1; + } + value if value.starts_with('^') => { + let command = value.chars().nth(1).unwrap_or(' '); + match command { + '0'..='9' => { + let count = value[1..].parse().unwrap_or(0); + for i in 0..count { + if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index + i)) { + field_map.insert(field_name.to_string(), None); + } + } + field_index += count; + } + 'P' | 'T' => { + let diff_value = serde_urlencoded::from_str(&value[2..]).unwrap_or_else(|_| value[2..].to_string()); + if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) { + if let Some(prev_value) = field_map.get(field_name).and_then(|v| v.as_ref()) { + let new_value = match command { + 'P' => { + // Apply JSON Patch + let patch: serde_json::Value = serde_json::from_str(&diff_value).unwrap_or(serde_json::Value::Null); + let mut prev_json: serde_json::Value = serde_json::from_str(prev_value).unwrap_or(serde_json::Value::Null); + let patch_operations: Vec<json_patch::PatchOperation> = serde_json::from_value(patch).unwrap_or_default(); + let _ = json_patch::patch(&mut prev_json, &patch_operations); + prev_json.to_string() + } + 'T' => { + // Apply TLCP-diff + //tlcp_diff::apply_diff(prev_value, &diff_value).unwrap_or_else(|_| prev_value.to_string()) + unimplemented!("Implement TLCP-diff"); + } + _ => unreachable!(), + }; + field_map.insert(field_name.to_string(), Some(new_value.to_string())); + } + } + field_index += 1; + } + _ => { + let decoded_value = serde_urlencoded::from_str(value).unwrap_or_else(|_| value.to_string()); + if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) { + field_map.insert(field_name.to_string(), Some(decoded_value)); + } + field_index += 1; + } + } + } + _ => { + let decoded_value = serde_urlencoded::from_str(value).unwrap_or_else(|_| value.to_string()); + if let Some(field_name) = subscription_fields.and_then(|fields| fields.get(field_index)) { + field_map.insert(field_name.to_string(), Some(decoded_value)); + } + field_index += 1; + } + } + } + + //println!("Field values: {:?}", field_map); + + let changed_fields: HashMap<String, Option<String>> = field_map.iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + //println!("Changed fields: {:?}", changed_fields); + + // + // Take the proper item_update from item_updates and update it with changed fields. + // If the item_update doesn't exist yet, create a new one. + // + match item_updates.get_mut(subscription_index) { + Some(item_updates) => match item_updates.get_mut(item_index) { + Some(item_update) => { + // + // Iterate changed_fields and update existing item_update.fields assigning the new values. + // + for (field_name, new_value) in &changed_fields { + if item_update.fields.contains_key(field_name) { + item_update.fields.insert((*field_name).clone(), new_value.clone()); + } + } + item_update.changed_fields = changed_fields; + }, + None => { + // Create a new item_update and add it to item_updates. + let item_update = ItemUpdate { + item_name: item.cloned(), + item_pos: item_index, + fields: field_map, + changed_fields: changed_fields, + is_snapshot: is_snapshot, + }; + item_updates.push(item_update); + } + }, + None => { + // Create a new item_update and add it to item_updates. + let item_update = ItemUpdate { + item_name: item.cloned(), + item_pos: item_index, + fields: field_map, + changed_fields: changed_fields, + is_snapshot: is_snapshot, + }; + item_updates.push(vec![item_update]); + } + }; + + //println!("Item updates: {:?}", item_updates); + + println!("Item updates: {}", serde_json::to_string_pretty(&item_updates).unwrap()); + println!("\n\n"); + + if item_updates.len() >= 3 { + return Ok(()); + } + + /* + if let Some(item_updates) = item_updates.get_mut(subscription_index) { + if let Some(item_update) = item_updates.get_mut(item_index) { + for (field_name, field_value) in changed_fields { + item_update.set_field_value(field_name, field_value); + } + } + } + */ + } // // Connection confirmation from server. // diff --git a/src/main.rs b/src/main.rs index fbb50ba..4959652 100644 --- a/src/main.rs +++ b/src/main.rs @@ -60,11 +60,31 @@ async fn main() -> Result<(), Box<dyn Error>> { let mut my_subscription = Subscription::new( SubscriptionMode::Merge, Some(vec![ - "item2".to_string(), + "item1".to_string(), "item2".to_string(), "item3".to_string(), + "item4".to_string(), + "item5".to_string(), + "item6".to_string(), + "item7".to_string(), + "item8".to_string(), + "item9".to_string(), + "item10".to_string(), + ]), + Some(vec![ + "stock_name".to_string(), + "last_price".to_string(), + "time".to_string(), + "pct_change".to_string(), + "bid_quantity".to_string(), + "bid".to_string(), + "ask".to_string(), + "ask_quantity".to_string(), + "min".to_string(), + "max".to_string(), + "ref_price".to_string(), + "open_price".to_string(), ]), - Some(vec!["stock_name".to_string(), "last_price".to_string()]), )?; my_subscription.set_data_adapter(Some(String::from("QUOTE_ADAPTER")))?; |