aboutsummaryrefslogtreecommitdiff
path: root/src/ls_client.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/ls_client.rs')
-rw-r--r--src/ls_client.rs339
1 files changed, 330 insertions, 9 deletions
diff --git a/src/ls_client.rs b/src/ls_client.rs
index 59dd247..26328f7 100644
--- a/src/ls_client.rs
+++ b/src/ls_client.rs
@@ -3,10 +3,42 @@ use crate::client_message_listener::ClientMessageListener;
use crate::connection_details::ConnectionDetails;
use crate::connection_options::ConnectionOptions;
use crate::subscription::Subscription;
-use crate::IllegalStateException;
+use crate::error::IllegalStateException;
+use crate::util::*;
use cookie::Cookie;
+use futures_util::{SinkExt, StreamExt};
+use std::collections::HashMap;
use std::fmt::{self, Debug, Formatter};
+use tokio_tungstenite::{
+ connect_async,
+ tungstenite::{
+ http::{HeaderName, HeaderValue, Request},
+ Message,
+ },
+};
+use url::Url;
+
+/// Represents the current status of the `LightstreamerClient`.
+pub enum ClientStatus {
+ Connecting,
+ Connected(ConnectionType),
+ Stalled,
+ Disconnected(DisconnectionType),
+}
+
+pub enum ConnectionType {
+ HttpPolling,
+ HttpStreaming,
+ StreamSensing,
+ WsPolling,
+ WsStreaming,
+}
+
+pub enum DisconnectionType {
+ WillRetry,
+ TryingRecovery,
+}
/// Facade class for the management of the communication to Lightstreamer Server. Used to provide
/// configuration settings, event handlers, operations for the control of the connection lifecycle,
@@ -46,7 +78,10 @@ use std::fmt::{self, Debug, Formatter};
/// * `IllegalArgumentException`: if a not valid address is passed. See `ConnectionDetails.setServerAddress()`
/// for details.
pub struct LightstreamerClient {
+ /// The address of the Lightstreamer Server to which this `LightstreamerClient` will connect.
server_address: Option<String>,
+ /// The name of the Adapter Set mounted on Lightstreamer Server to be used to handle all
+ /// requests in the Session associated with this `LightstreamerClient`.
adapter_set: Option<String>,
/// Data object that contains the details needed to open a connection to a Lightstreamer Server.
/// This instance is set up by the `LightstreamerClient` object at its own creation. Properties
@@ -56,8 +91,13 @@ pub struct LightstreamerClient {
/// is set up by the `LightstreamerClient` object at its own creation. Properties of this object
/// can be overwritten by values received from a Lightstreamer Server.
pub connection_options: ConnectionOptions,
+ /// A list of listeners that will receive events from the `LightstreamerClient` instance.
listeners: Vec<Box<dyn ClientListener>>,
+ /// A list containing all the `Subscription` instances that are currently "active" on this
+ /// `LightstreamerClient`.
subscriptions: Vec<Subscription>,
+ /// The current status of the client.
+ status: ClientStatus,
}
impl Debug for LightstreamerClient {
@@ -147,11 +187,239 @@ impl LightstreamerClient {
/// See also `ClientListener.onStatusChange()`
///
/// See also `ConnectionDetails.setServerAddress()`
- pub fn connect(&mut self) -> Result<(), IllegalStateException> {
+ pub async fn connect(&mut self) -> Result<(), IllegalStateException> {
if self.server_address.is_none() {
- return Err(IllegalStateException::new("No server address was configured."));
+ return Err(IllegalStateException::new(
+ "No server address was configured.",
+ ));
+ }
+
+ let forced_transport = self.connection_options.get_forced_transport();
+ if forced_transport.is_none() || *forced_transport.unwrap() != Transport::WsStreaming {
+ // unwrap() is safe here.
+ return Err(IllegalStateException::new(
+ "Only WebSocket streaming transport is currently supported.",
+ ));
+ }
+
+ let mut params = HashMap::new();
+
+ //
+ // Build the mandatory request parameters.
+ //
+
+ params.insert("LS_protocol", "TLCP-2.5.0");
+ params.insert("LS_cid", "mgQkwtwdysogQz2BJ4Ji%20kOj2Bg");
+
+ //
+ // Add optional parameters
+ //
+
+ /*
+
+ if let Some(user) = &self.connection_details.get_user() {
+ params.insert("LS_user", user);
+ }
+
+ if let Some(password) = &self.connection_details.get_password() {
+ params.insert("LS_password", password);
+ }
+
+ if let Some(adapter_set) = &self.adapter_set {
+ params.insert("LS_adapter_set", adapter_set);
+ }
+
+ if let Some(requested_max_bandwidth) = self.connection_options.get_requested_max_bandwidth() {
+ params.insert("LS_requested_max_bandwidth", &requested_max_bandwidth.to_string());
+ }
+
+ if let Some(content_length) = self.connection_options.get_content_length() {
+ params.insert("LS_content_length", &content_length.to_string());
+ }
+
+ if let Some(supported_diffs) = &self.connection_options.get_supported_diffs() {
+ params.insert("LS_supported_diffs", supported_diffs);
+ }
+
+ if self.connection_options.is_polling() {
+ params.insert("LS_polling", "true");
+ if let Some(polling_millis) = self.connection_options.get_polling_millis() {
+ params.insert("LS_polling_millis", &polling_millis.to_string());
+ }
+
+ if let Some(idle_millis) = self.connection_options.get_idle_millis() {
+ params.insert("LS_idle_millis", &idle_millis.to_string());
+ }
+ } else {
+ if let Some(inactivity_millis) = self.connection_options.get_inactivity_millis() {
+ params.insert("LS_inactivity_millis", &inactivity_millis.to_string());
+ }
+
+ if let Some(keepalive_millis) = self.connection_options.get_keepalive_millis() {
+ params.insert("LS_keepalive_millis", &keepalive_millis.to_string());
+ }
+
+ if !self.connection_options.is_send_sync() {
+ params.insert("LS_send_sync", "false");
+ }
+ }
+
+ if self.connection_options.is_reduce_head() {
+ params.insert("LS_reduce_head", "true");
+ }
+
+ if let Some(ttl_millis) = self.connection_options.get_ttl_millis() {
+ params.insert("LS_ttl_millis", &ttl_millis.to_string());
+ }
+
+ // Build the request body
+ let request_body = build_request_body(&params);
+
+ // Send the create session request
+ let response = send_create_session_request(
+ &self.server_address.as_ref().unwrap(),
+ &request_body,
+ )?;
+
+ // Process the server response
+ process_create_session_response(&response)?;
+
+ */
+
+ //
+ // Convert the HTTP URL to a WebSocket URL.
+ //
+ let http_url = self.connection_details.get_server_address().unwrap(); // unwrap() is safe here.
+ let mut url = Url::parse(&http_url)
+ .expect("Failed to parse server address URL from connection details.");
+ match url.scheme() {
+ "http" => url
+ .set_scheme("ws")
+ .expect("Failed to set scheme to ws for WebSocket URL."),
+ "https" => url
+ .set_scheme("wss")
+ .expect("Failed to set scheme to wss for WebSocket URL."),
+ invalid_scheme => {
+ return Err(IllegalStateException::new(&format!(
+ "Unsupported scheme '{}' found when converting HTTP URL to WebSocket URL.",
+ invalid_scheme
+ )));
+ }
+ }
+ let ws_url = url.as_str();
+
+ // Build the WebSocket request with the necessary headers.
+ let request = Request::builder()
+ .uri(ws_url)
+ .header(
+ HeaderName::from_static("connection"),
+ HeaderValue::from_static("Upgrade"),
+ )
+ .header(
+ HeaderName::from_static("host"),
+ HeaderValue::from_str(url.host_str().unwrap_or("localhost")).map_err(|err| {
+ IllegalStateException::new(&format!("Invalid header value for header with name 'host': {}", err))
+ })?,
+ )
+ .header(
+ HeaderName::from_static("sec-websocket-key"),
+ HeaderValue::from_static("PNDUibe9ex7PnsrLbt0N4w=="),
+ )
+ .header(
+ HeaderName::from_static("sec-websocket-protocol"),
+ HeaderValue::from_static("TLCP-2.5.0.lightstreamer.com"),
+ )
+ .header(
+ HeaderName::from_static("sec-websocket-version"),
+ HeaderValue::from_static("13"),
+ )
+ .header(
+ HeaderName::from_static("upgrade"),
+ HeaderValue::from_static("websocket"),
+ )
+ .body(())
+ .unwrap();
+
+ // Connect to the Lightstreamer server using WebSocket.
+ let ws_stream = match connect_async(request).await {
+ Ok((ws_stream, response)) => {
+ if let Some(server_header) = response.headers().get("server") {
+ println!("Connected to Lightstreamer server: {}", server_header.to_str().unwrap_or(""));
+ } else {
+ println!("Connected to Lightstreamer server");
+ }
+ ws_stream
+ },
+ Err(err) => {
+ return Err(IllegalStateException::new(&format!(
+ "Failed to connect to Lightstreamer server with WebSocket: {}",
+ err
+ )));
+ }
+ };
+
+ // Split the WebSocket stream into a write and read stream.
+ let (mut write_stream, mut read_stream) = ws_stream.split();
+
+ //
+ // Confirm the connection by sending a 'wsok' message to the server.
+ //
+ write_stream.send(Message::Text("wsok".into())).await.expect("Failed to send message");
+ if let Some(result) = read_stream.next().await {
+ match result? {
+ Message::Text(text) => {
+ let clean_text = clean_message(&text);
+ if clean_text == "wsok" {
+ println!("Connection confirmed by server");
+ } else {
+ return Err(IllegalStateException::new(&format!(
+ "Unexpected message received from server: {}",
+ clean_text
+ )));
+ }
+ },
+ non_text_message => {
+ println!("Unexpected non-text message from server: {:?}", non_text_message);
+ },
+ }
+ }
+
+ /*
+ // Session creation parameters
+ let params = [
+ ("LS_op2", "create_session"),
+ ("LS_cid", "mgQkwtwdysogQz2BJ4Ji kOj2Bg"),
+ ("LS_adapter_set", "DEMO"),
+ ];
+
+ let encoded_params = serde_urlencoded::to_string(&params)?;
+
+ // Send the create session message
+ write_stream
+ .send(Message::Text(format!("{}\n", encoded_params)))
+ .await?;
+ */
+
+ // Listen for messages from the server
+ while let Some(message) = read_stream.next().await {
+ match message? {
+ Message::Text(text) => {
+ if text.starts_with("CONOK") {
+ let session_info: Vec<&str> = text.split(",").collect();
+ let session_id = session_info.get(1).unwrap_or(&"").to_string();
+ println!("Session established with ID: {}", session_id);
+ //subscribe_to_channel_ws(session_id, write_stream).await?;
+ break; // Exit after successful subscription
+ } else {
+ println!("Received unexpected message from server: {}", text);
+ }
+ }
+ msg => { println!("Received non-text message from server: {:?}", msg); }
+ }
}
+ println!("No more messages from server");
+
Ok(())
}
@@ -173,6 +441,7 @@ impl LightstreamerClient {
/// See also `connect()`
pub fn disconnect(&mut self) {
// Implementation for disconnect
+ println!("Disconnecting from Lightstreamer server");
}
/// Static inquiry method that can be used to share cookies between connections to the Server
@@ -229,9 +498,8 @@ impl LightstreamerClient {
/// - `"DISCONNECTED"`: no connection is currently active.
///
/// See also `ClientListener.onStatusChange()`
- pub fn get_status(&self) -> &str {
- // Implementation for get_status
- unimplemented!()
+ pub fn get_status(&self) -> &ClientStatus {
+ &self.status
}
/// Inquiry method that returns a list containing all the `Subscription` instances that are
@@ -291,7 +559,7 @@ impl LightstreamerClient {
) -> Result<LightstreamerClient, IllegalStateException> {
let connection_details = ConnectionDetails::new(server_address, adapter_set);
let connection_options = ConnectionOptions::new();
-
+
Ok(LightstreamerClient {
server_address: server_address.map(|s| s.to_string()),
adapter_set: adapter_set.map(|s| s.to_string()),
@@ -299,6 +567,7 @@ impl LightstreamerClient {
connection_options,
listeners: Vec::new(),
subscriptions: Vec::new(),
+ status: ClientStatus::Disconnected(DisconnectionType::WillRetry),
})
}
@@ -401,7 +670,30 @@ impl LightstreamerClient {
listener: Option<Box<dyn ClientMessageListener>>,
enqueue_while_disconnected: bool,
) {
- // Implementation for send_message
+ let sequence = sequence.unwrap_or_else(|| "UNORDERED_MESSAGES");
+
+ // Handle the message based on the current connection status
+ match &self.status {
+ ClientStatus::Connected(connection_type) => {
+ // Send the message to the server in a separate thread
+ // ...
+ }
+ ClientStatus::Disconnected(disconnection_type) => {
+ if enqueue_while_disconnected {
+ // Enqueue the message to be sent when a connection is available
+ // ...
+ } else {
+ // Abort the message and notify the listener
+ if let Some(listener) = listener {
+ listener.on_abort(message, false);
+ }
+ }
+ }
+ _ => {
+ // Enqueue the message to be sent when a connection is available
+ // ...
+ }
+ }
}
/// Static method that permits to configure the logging system used by the library. The logging
@@ -521,4 +813,33 @@ impl LightstreamerClient {
}
}
*/
-} \ No newline at end of file
+}
+
+/// The transport type to be used by the client.
+/// - WS: the Stream-Sense algorithm is enabled as in the `None` case but the client will
+/// only use WebSocket based connections. If a connection over WebSocket is not possible
+/// because of the environment the client will not connect at all.
+/// - HTTP: the Stream-Sense algorithm is enabled as in the `None` case but the client
+/// will only use HTTP based connections. If a connection over HTTP is not possible because
+/// of the environment the client will not connect at all.
+/// - WS-STREAMING: the Stream-Sense algorithm is disabled and the client will only connect
+/// on Streaming over WebSocket. If Streaming over WebSocket is not possible because of
+/// the environment the client will not connect at all.
+/// - HTTP-STREAMING: the Stream-Sense algorithm is disabled and the client will only
+/// connect on Streaming over HTTP. If Streaming over HTTP is not possible because of the
+/// browser/environment the client will not connect at all.
+/// - WS-POLLING: the Stream-Sense algorithm is disabled and the client will only connect
+/// on Polling over WebSocket. If Polling over WebSocket is not possible because of the
+/// environment the client will not connect at all.
+/// - HTTP-POLLING: the Stream-Sense algorithm is disabled and the client will only connect
+/// on Polling over HTTP. If Polling over HTTP is not possible because of the environment
+/// the client will not connect at all.
+#[derive(Debug, PartialEq)]
+pub enum Transport {
+ Ws,
+ Http,
+ WsStreaming,
+ HttpStreaming,
+ WsPolling,
+ HttpPolling,
+}