aboutsummaryrefslogtreecommitdiff
path: root/src/main.rs
blob: 2e3c3c5bd8beb9bc5e5e5279e9e35d5639599567 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
use futures::stream::StreamExt;
use reqwest::Client;
use std::error::Error;
use std::sync::{Arc, Mutex};

async fn establish_persistent_http_connection(
    session_id_shared: Arc<Mutex<String>>,
) -> Result<(), reqwest::Error> {
    let client = Client::new();
    let params = [
        ("LS_adapter_set", "DEMO"),
        ("LS_cid", "mgQkwtwdysogQz2BJ4Ji%20kOj2Bg"),
    ];
    let request_url =
        "http://push.lightstreamer.com/lightstreamer/create_session.txt?LS_protocol=TLCP-2.0.0";

    let response = client.post(request_url).form(&params).send().await?;

    if response.status().is_success() {
        let mut stream = response.bytes_stream();

        while let Some(item) = stream.next().await {
            match item {
                Ok(bytes) => {
                    let response_text = String::from_utf8(bytes.to_vec())
                        .expect("Failed to convert bytes to string");
                    if let Some(start) = response_text.find("CONOK,") {
                        if let Some(end) = response_text.find(",50000,5000,*\r\n") {
                            let session_id = &response_text[start + 6..end];
                            println!("Session ID: {}", session_id);
                            let mut session_id_lock = session_id_shared.lock().unwrap();
                            *session_id_lock = session_id.to_string();
                        }
                    } else {
                        println!("New message: {}", response_text);
                    }
                }
                Err(e) => println!("Error while receiving: {:?}", e),
            }
        }
    } else {
        println!("Response was not successful: {}", response.status());
    }

    Ok(())
}

async fn subscribe_to_channel(session_id: String) -> Result<(), reqwest::Error> {
    let client = Client::new();
    let subscribe_url = "http://push.lightstreamer.com/lightstreamer/bind_session.txt";
    let params = [("LS_session", &session_id)];

    let response = client.post(subscribe_url).form(&params).send().await?;

    if response.status().is_success() {
        println!("Subscription successful!");
    } else {
        println!("Subscription failed: {}", response.status());
    }

    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {

    let session_id_shared = Arc::new(Mutex::new(String::new()));
    let session_id_shared_clone = session_id_shared.clone();

    let task1 = tokio::spawn(async move {
        establish_persistent_http_connection(session_id_shared_clone).await.unwrap();
    });

    println!("Established connection to Lightstreamer server");
    let task2 = tokio::spawn(async move {
        let mut session_established = false;
        loop {
            tokio::time::sleep(std::time::Duration::from_secs(1)).await;
            let session_id;
            {
                session_id = session_id_shared.lock().unwrap().clone();
            }

            if !session_established && !session_id.is_empty() {
                println!("Accessed Session ID from another thread: {}", session_id);
                session_established = true;
                subscribe_to_channel(session_id).await.unwrap();
            }
        }
    });

    task1.await?;
    task2.await?;

    Ok(())
}