Reproducible example [main.rs] [
My original query loop:
fn construct_headers() -> HeaderMap { let mut headers = HeaderMap::new(); headers.insert(USER_AGENT, HeaderValue::from_static("xxx-zzz")); headers
}
let mut headers = HeaderMap::new(); headers.insert(USER_AGENT, HeaderValue::from_static("reqwest"));
let client = reqwest::Client::builder().build()?;
let params = [("subreddit", subr), ("size", &size.to_string()), ("before", &last.clone())]; //before is shifting timestamps based on last entry's time created in responses to progress back in time
let mut resp = client.get("") .query(¶ms) .headers(construct_headers()) .send() .await? .text() .await?
;
if resp.to_string().contains("Too Many"){ 'rqst:loop{ thread::sleep(time::Duration::from_secs(1)); resp = client.get("") .query(¶ms) .headers(construct_headers()) .send() .await? .text() .await? ; if resp.to_string().contains("Too Many"){ continue } else { break 'rqst }; };
};Trying to wrap it in timeout, because it fails occasionally after few hours of polling (infinite timeout) due to the API's problem:
if resp.to_string().contains("Too Many"){ 'rqst:loop{ thread::sleep(time::Duration::from_secs(1)); resp = client.get("") .query(¶ms) .headers(construct_headers()) // .send() // .await ; match tokio::time::timeout(std::time::Duration::from_secs(30), resp.send()).await { Ok(result) => match result { Ok(response) => response.text().await?, //convert to text - will this work? Err(e) => return Ok(linkz), //early return }, Err(_) => return Ok(linkz), //early return }; if resp.to_string().contains("Too Many"){ continue } else { break 'rqst }; };
};No matter how I try to make it happen (whether resp shadowing ends just with .send() or .await as the commented out lines show) the errors are abundant. Oftentimes tokio expects String, or do I misunderstand it?
Example if ends with .send(). Technically this is the future?:
error[E0308]: mismatched types --> src/main.rs:204:24 |
191 | let mut resp = client.get("") | ________________________-
192 | | .query(¶ms)
193 | | .headers(construct_headers())
194 | | .send()
195 | | .await?
196 | | .text()
197 | | .await? | |___________________- expected due to this value
204 | resp = client.get("") | ________________________^
205 | | .query(¶ms)
206 | | .headers(construct_headers())
207 | | .send() | |___________________________^ expected struct `std::string::String`, found opaque type | ::: /home/xxx/.cargo/registry/src/github.com-1ecc6299db9ec823/reqwest-0.11.11/src/async_impl/request.rs:499:26 |
499 | pub fn send(self) -> impl Future<Output = Result<Response, crate::Error>> { | ---------------------------------------------------- the found opaque type | = note: expected struct `std::string::String` found opaque type `impl Future<Output = Result<reqwest::Response, reqwest::Error>>`
error[E0277]: `std::string::String` is not a future --> src/main.rs:210:80 |
210 | match tokio::time::timeout(std::time::Duration::from_secs(30), resp).await { | -------------------- ^^^^ `std::string::String` is not a future | | | required by a bound introduced by this call | = help: the trait `Future` is not implemented for `std::string::String` = note: std::string::String must be a future or must implement `IntoFuture` to be awaited
note: required by a bound in `timeout` --> /home/xxx/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.19.2/src/time/timeout.rs:73:8 |
73 | T: Future, | ^^^^^^ required by this bound in `timeout`
error[E0277]: `std::string::String` is not a future --> src/main.rs:210:85 |
210 | match tokio::time::timeout(std::time::Duration::from_secs(30), resp).await { | --------------------------------------------------------------^^^^^^ `std::string::String` is not a future | | | this call returns `std::string::String` | = help: the trait `Future` is not implemented for `std::string::String` = note: std::string::String must be a future or must implement `IntoFuture` to be awaited = note: required because of the requirements on the impl of `Future` for `tokio::time::Timeout<std::string::String>` = note: required because of the requirements on the impl of `IntoFuture` for `tokio::time::Timeout<std::string::String>`
help: remove the `.await` |
210 - match tokio::time::timeout(std::time::Duration::from_secs(30), resp).await {
210 + match tokio::time::timeout(std::time::Duration::from_secs(30), resp) { | Example if ends with .headers():
204 | resp = client.get("") | ________________________^
205 | | .query(¶ms)
206 | | .headers(construct_headers()) | |_________________________________________________^ expected struct `std::string::String`, found struct `reqwest::RequestBuilder`In general the future I want to wrap is just
resp = client.get("") .query(¶ms) .headers(construct_headers()) .send() .await?However it doesn't work no matter how I put it in the tokio timeout.
I've never used tokio's timeout until now, how do I do this? What's wrong here and why is String expected? This works normally with just .awaiting the send().
According to the snippet from previous error
found opaque type | = note: expected struct `std::string::String` found opaque type `impl Future<Output = Result<reqwest::Response, reqwest::Error>>`it expects String....Why?
pub fn timeout<T>(duration: Duration, future: T) -> Timeout<T>
where T: Future,I'm putting a future there.
EDIT:
Fixed the issue somewhat by dropping the shadowing idea. But I believe the program will still fail. The problem was that the first resp results in a String, so later it will expect a String.
if resp.to_string().contains("Too Many"){ println!("2many rqstz"); 'rqst:loop{ thread::sleep(time::Duration::from_secs(1)); let resp = client.get("") .query(¶ms) .headers(construct_headers()) .send() ; match tokio::time::timeout(std::time::Duration::from_secs(30), async {&resp}).await { Ok(result) => result, Err(_) => return Ok(linkz), }; if resp.await?.text().await?.contains("Too Many"){ println!("2mny"); continue } else { break 'rqst }; };
};I am afraid this will get stuck in the loop endlessly again because ofif resp.await?.text().await?.contains("Too Many"){
this line. But I will know after hours of it running (the API calls await forever only after long time, at the last ever responses from a subreddit).
If anybody has any tips as to what to do here to make it more safe - it seems to me now the if condition separately awaits the resp? So without shadowing, if the resp times out the if will stuck itself in the infinite loop which was my initial problem?
However, if the first attempt fails, it should early return. Since it's still messy, please advise.
1 Answer
First off, a couple of things I would improve:
resp.to_string()doesn't do much, becauserespis already a string. It simply copies it into a new object. Just remove.to_string().- Never use
thread::sleepin anasyncprogram. Usetokio::time::sleepinstead. Otherwise you block the entire event loop. - Don't just return
Ok(())in an error case, use proper error handling instead - You query twice, once outside and once inside the loop. You can combine those two.
- You
timeouton thesend(), but not on thetext(). Thetext()can hang just as much, as it is the actual data transfer. You can move all the entire logic of this in a nested function, to simplify it.
You already figured out that the problem with your change was that resp is already a String. You cannot assing the result of .headers() to it. You can, however, simply create a new variable for that.
That said, I had the liberty to implement the feedback and did some refactoring:
use reqwest::{ header::{HeaderMap, HeaderValue, USER_AGENT}, Client,
};
use std::time::Duration;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum RedditPollError { #[error("poll timed out")] PollTimeout, #[error("the connection failed")] ConnectionFailed(#[from] reqwest::Error),
}
async fn poll_reddit_submissions() -> Result<String, RedditPollError> { let subr = "selfie"; //example subreddit let last = "1658059417"; //current time let size = "100"; //max response size fn construct_headers() -> HeaderMap { let mut headers = HeaderMap::new(); headers.insert(USER_AGENT, HeaderValue::from_static("xxx-zzz")); headers } let mut headers = HeaderMap::new(); headers.insert(USER_AGENT, HeaderValue::from_static("reqwest")); let client = reqwest::Client::builder().build()?; let params = [ ("subreddit", subr), ("size", &size.to_string()), ("before", &last.clone()), ]; //before is shifting timestamps based on last entry's time created in responses to progress back in time Ok(loop { async fn try_poll(client: &Client, params: &[(&str, &str)]) -> reqwest::Result<String> { client .get("") .query(params) .headers(construct_headers()) .send() .await? .text() .await } let response = tokio::time::timeout(Duration::from_secs(30), try_poll(&client, ¶ms)) .await .map_err(|_| RedditPollError::PollTimeout)? .map_err(|e| RedditPollError::ConnectionFailed(e))?; if response.contains("Too Many") { tokio::time::sleep(Duration::from_secs(1)).await; } else { break response; }; })
}
#[tokio::main]
async fn main() { match poll_reddit_submissions().await { Ok(message) => println!("{}", message), Err(e) => println!("Failed: {}", e), }
}I assume that some of the parameters will probably have to be moved out of poll_reddit_submissions and become a function parameter instead.