1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
|
use reqwest::Client;
use futures::stream::StreamExt; // Actualizado para usar futures::StreamExt
use tokio::runtime::Runtime;
use std::sync::{Arc, Mutex};
use std::thread;
async fn establish_persistent_http_connection(session_id_shared: Arc<Mutex<String>>) -> Result<(), reqwest::Error> {
let client = Client::new();
let params = [("LS_adapter_set", "DEMO"), ("LS_cid", "mgQkwtwdysogQz2BJ4Ji%20kOj2Bg")];
let request_url = "http://push.lightstreamer.com/lightstreamer/create_session.txt?LS_protocol=TLCP-2.0.0";
let response = client.post(request_url)
.form(¶ms)
.send()
.await?;
if response.status().is_success() {
let mut stream = response.bytes_stream();
while let Some(item) = stream.next().await {
match item {
Ok(bytes) => {
let response_text = String::from_utf8(bytes.to_vec()).expect("Failed to convert bytes to string");
// Intenta extraer el ID de sesión de la primera respuesta
if let Some(start) = response_text.find("CONOK,") {
if let Some(end) = response_text.find(",50000,5000,*\r\n") {
let session_id = &response_text[start + 6..end];
println!("Session ID: {}", session_id);
// Guardar el session_id en la variable compartida
let mut session_id_lock = session_id_shared.lock().unwrap();
*session_id_lock = session_id.to_string();
}
}
},
Err(e) => println!("Error while receiving: {:?}", e),
}
}
} else {
println!("Response was not successful: {}", response.status());
}
Ok(())
}
fn main() {
let rt = Runtime::new().unwrap();
// Crear una variable compartida para almacenar el session_id
let session_id_shared = Arc::new(Mutex::new(String::new()));
let session_id_shared_clone = session_id_shared.clone();
// Lanzar la función establecer conexión en un nuevo hilo de Tokio
rt.spawn(async move {
establish_persistent_http_connection(session_id_shared_clone).await.unwrap();
});
// Crear otro hilo para acceder al session_id
thread::spawn(move || {
loop {
// Simular un delay para esperar a que el session_id esté listo
thread::sleep(std::time::Duration::from_secs(5));
let session_id = session_id_shared.lock().unwrap();
if !session_id.is_empty() {
println!("Accessed Session ID from another thread: {}", *session_id);
break; // Salir del bucle si ya se obtuvo el session_id
}
}
})
.join()
.unwrap();
}
/*
use futures::stream::StreamExt;
use serde::{Deserialize, Serialize};
use reqwest::Client;
use reqwest::Error;
use tokio::time::sleep;
use std::time::Duration;
use std::string::FromUtf8Error;
use std::sync::Arc;
#[derive(Debug, Deserialize, Serialize, Clone)]
struct Session {
key: String,
server: String,
}
async fn establish_connection(client: &Client, server: &str) -> Result<Session, Error> {
println!("Establishing connection to server: {}", server);
let response = client.post(format!("{}/lightstreamer/create_session.txt", server))
.body("LS_adapter_set=DEMO&LS_cid=mgQkwtwdysogQz2BJ4Ji%20kOj2Bg")
.send()
.await?;
let mut stream = response.bytes_stream();
while let Some(item) = stream.next().await {
match item {
Ok(bytes) => {
// Convierte cualquier error de FromUtf8Error a Box<dyn Error>
let message = String::from_utf8(bytes.to_vec()).map_err(|e| Box::new(e) as Box<dyn std::error::Error>);
println!("Received: {:?}", message);
},
Err(e) => {
eprintln!("Error while reading from stream: {}", e);
break;
}
}
}
let session = Session {
key: "S2520d1412903a84dM42fT4356206".to_string(),
server: server.to_string(),
};
Ok(session)
}
async fn keep_alive(session: Arc<Session>) {
let client = Client::new();
loop {
let response = client
.post(&format!("{}/lightstreamer/control.txt", session.server))
.body("LS_op=probe")
.send()
.await;
if let Err(error) = response {
eprintln!("Error en keep_alive: {}", error);
}
sleep(Duration::from_secs(5)).await;
}
}
async fn subscribe_to_channel(session: &Session, channel_id: &str) -> Result<(), Error> {
let client = Client::new();
let response = client
.post(&format!("{}/lightstreamer/control.txt", session.server))
.body(format!(
"LS_op=subscribe&LS_session={}&LS_id={}&LS_mode=DISTINCT",
session.key, channel_id
))
.send()
.await?;
let messages = response.json::<Vec<serde_json::Value>>().await?;
for message in messages {
println!("Message: {:?}", message);
}
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::new();
let session = establish_connection(&client, "http://push.lightstreamer.com").await?;
let session_arc = Arc::new(session);
let keep_alive_session = session_arc.clone();
tokio::spawn(async move {
keep_alive(keep_alive_session).await;
});
subscribe_to_channel(&session_arc, "chat_room").await?;
Ok(())
}
*/
|