fix(http): web requests (#3643)
This commit is contained in:
parent
a88b34f54f
commit
ba2772e31c
5 changed files with 125 additions and 627 deletions
617
Cargo.lock
generated
617
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
|
|
@ -6,10 +6,10 @@ use zellij_utils::consts::{
|
||||||
use zellij_utils::data::{Event, HttpVerb, SessionInfo};
|
use zellij_utils::data::{Event, HttpVerb, SessionInfo};
|
||||||
use zellij_utils::errors::{prelude::*, BackgroundJobContext, ContextType};
|
use zellij_utils::errors::{prelude::*, BackgroundJobContext, ContextType};
|
||||||
use zellij_utils::input::layout::RunPlugin;
|
use zellij_utils::input::layout::RunPlugin;
|
||||||
use zellij_utils::surf::{
|
|
||||||
http::{Method, Url},
|
use zellij_utils::isahc::prelude::*;
|
||||||
RequestBuilder,
|
use zellij_utils::isahc::AsyncReadResponseExt;
|
||||||
};
|
use zellij_utils::isahc::{config::RedirectPolicy, HttpClient, Request};
|
||||||
|
|
||||||
use std::collections::{BTreeMap, HashMap};
|
use std::collections::{BTreeMap, HashMap};
|
||||||
use std::fs;
|
use std::fs;
|
||||||
|
|
@ -101,6 +101,12 @@ pub(crate) fn background_jobs_main(
|
||||||
let serialization_interval = serialization_interval.map(|s| s * 1000); // convert to
|
let serialization_interval = serialization_interval.map(|s| s * 1000); // convert to
|
||||||
// milliseconds
|
// milliseconds
|
||||||
|
|
||||||
|
let http_client = HttpClient::builder()
|
||||||
|
// TODO: timeout?
|
||||||
|
.redirect_policy(RedirectPolicy::Follow)
|
||||||
|
.build()
|
||||||
|
.ok();
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let (event, mut err_ctx) = bus.recv().with_context(err_context)?;
|
let (event, mut err_ctx) = bus.recv().with_context(err_context)?;
|
||||||
err_ctx.add_call(ContextType::BackgroundJob((&event).into()));
|
err_ctx.add_call(ContextType::BackgroundJob((&event).into()));
|
||||||
|
|
@ -275,41 +281,60 @@ pub(crate) fn background_jobs_main(
|
||||||
BackgroundJob::WebRequest(plugin_id, client_id, url, verb, headers, body, context) => {
|
BackgroundJob::WebRequest(plugin_id, client_id, url, verb, headers, body, context) => {
|
||||||
task::spawn({
|
task::spawn({
|
||||||
let senders = bus.senders.clone();
|
let senders = bus.senders.clone();
|
||||||
|
let http_client = http_client.clone();
|
||||||
async move {
|
async move {
|
||||||
async fn web_request(
|
async fn web_request(
|
||||||
url: String,
|
url: String,
|
||||||
verb: HttpVerb,
|
verb: HttpVerb,
|
||||||
headers: BTreeMap<String, String>,
|
headers: BTreeMap<String, String>,
|
||||||
body: Vec<u8>,
|
body: Vec<u8>,
|
||||||
|
http_client: HttpClient,
|
||||||
) -> Result<
|
) -> Result<
|
||||||
(u16, BTreeMap<String, String>, Vec<u8>), // status_code, headers, body
|
(u16, BTreeMap<String, String>, Vec<u8>), // status_code, headers, body
|
||||||
zellij_utils::surf::Error,
|
zellij_utils::isahc::Error,
|
||||||
> {
|
> {
|
||||||
let url = Url::parse(&url)?;
|
let mut request = match verb {
|
||||||
let http_method = match verb {
|
HttpVerb::Get => Request::get(url),
|
||||||
HttpVerb::Get => Method::Get,
|
HttpVerb::Post => Request::post(url),
|
||||||
HttpVerb::Post => Method::Post,
|
HttpVerb::Put => Request::put(url),
|
||||||
HttpVerb::Put => Method::Put,
|
HttpVerb::Delete => Request::delete(url),
|
||||||
HttpVerb::Delete => Method::Delete,
|
|
||||||
};
|
};
|
||||||
let mut req = RequestBuilder::new(http_method, url);
|
|
||||||
if !body.is_empty() {
|
|
||||||
req = req.body(body);
|
|
||||||
}
|
|
||||||
for (header, value) in headers {
|
for (header, value) in headers {
|
||||||
req = req.header(header.as_str(), value);
|
request = request.header(header.as_str(), value);
|
||||||
}
|
}
|
||||||
let mut res = req.await?;
|
let mut res = if !body.is_empty() {
|
||||||
|
let req = request.body(body)?;
|
||||||
|
http_client.send_async(req).await?
|
||||||
|
} else {
|
||||||
|
let req = request.body(())?;
|
||||||
|
http_client.send_async(req).await?
|
||||||
|
};
|
||||||
|
|
||||||
let status_code = res.status();
|
let status_code = res.status();
|
||||||
let headers: BTreeMap<String, String> = res
|
let headers: BTreeMap<String, String> = res
|
||||||
|
.headers()
|
||||||
.iter()
|
.iter()
|
||||||
.map(|(name, value)| (name.to_string(), value.to_string()))
|
.filter_map(|(name, value)| match value.to_str() {
|
||||||
|
Ok(value) => Some((name.to_string(), value.to_string())),
|
||||||
|
Err(e) => {
|
||||||
|
log::error!(
|
||||||
|
"Failed to convert header {:?} to string: {:?}",
|
||||||
|
name,
|
||||||
|
e
|
||||||
|
);
|
||||||
|
None
|
||||||
|
},
|
||||||
|
})
|
||||||
.collect();
|
.collect();
|
||||||
let body = res.take_body().into_bytes().await?;
|
let body = res.bytes().await?;
|
||||||
Ok((status_code as u16, headers, body))
|
Ok((status_code.as_u16(), headers, body))
|
||||||
}
|
}
|
||||||
|
let Some(http_client) = http_client else {
|
||||||
|
log::error!("Cannot perform http request, likely due to a misconfigured http client");
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
match web_request(url, verb, headers, body).await {
|
match web_request(url, verb, headers, body, http_client).await {
|
||||||
Ok((status, headers, body)) => {
|
Ok((status, headers, body)) => {
|
||||||
let _ = senders.send_to_plugin(PluginInstruction::Update(vec![(
|
let _ = senders.send_to_plugin(PluginInstruction::Update(vec![(
|
||||||
Some(plugin_id),
|
Some(plugin_id),
|
||||||
|
|
|
||||||
|
|
@ -56,10 +56,8 @@ async-std = { version = "1.3.0", features = ["unstable", "attributes"] }
|
||||||
notify-debouncer-full = "0.1.0"
|
notify-debouncer-full = "0.1.0"
|
||||||
humantime = "2.1.0"
|
humantime = "2.1.0"
|
||||||
futures = "0.3.28"
|
futures = "0.3.28"
|
||||||
surf = { version = "2.3.2", default-features = false, features = [
|
|
||||||
"curl-client",
|
|
||||||
] }
|
|
||||||
openssl-sys = { version = "0.9.93", features = ["vendored"] }
|
openssl-sys = { version = "0.9.93", features = ["vendored"] }
|
||||||
|
isahc = "1.7.2"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
insta = { version = "1.6.0", features = ["backtrace"] }
|
insta = { version = "1.6.0", features = ["backtrace"] }
|
||||||
|
|
|
||||||
|
|
@ -3,15 +3,18 @@ use async_std::{
|
||||||
io::{ReadExt, WriteExt},
|
io::{ReadExt, WriteExt},
|
||||||
stream::StreamExt,
|
stream::StreamExt,
|
||||||
};
|
};
|
||||||
|
use isahc::prelude::*;
|
||||||
|
use isahc::{config::RedirectPolicy, HttpClient, Request};
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use surf::Client;
|
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
use url::Url;
|
use url::Url;
|
||||||
|
|
||||||
#[derive(Error, Debug)]
|
#[derive(Error, Debug)]
|
||||||
pub enum DownloaderError {
|
pub enum DownloaderError {
|
||||||
#[error("RequestError: {0}")]
|
#[error("RequestError: {0}")]
|
||||||
Request(surf::Error),
|
Request(#[from] isahc::Error),
|
||||||
|
#[error("HttpError: {0}")]
|
||||||
|
HttpError(#[from] isahc::http::Error),
|
||||||
#[error("IoError: {0}")]
|
#[error("IoError: {0}")]
|
||||||
Io(#[source] std::io::Error),
|
Io(#[source] std::io::Error),
|
||||||
#[error("File name cannot be found in URL: {0}")]
|
#[error("File name cannot be found in URL: {0}")]
|
||||||
|
|
@ -22,14 +25,18 @@ pub enum DownloaderError {
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Downloader {
|
pub struct Downloader {
|
||||||
client: Client,
|
client: Option<HttpClient>,
|
||||||
location: PathBuf,
|
location: PathBuf,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for Downloader {
|
impl Default for Downloader {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self {
|
Self {
|
||||||
client: surf::client().with(surf::middleware::Redirect::default()),
|
client: HttpClient::builder()
|
||||||
|
// TODO: timeout?
|
||||||
|
.redirect_policy(RedirectPolicy::Follow)
|
||||||
|
.build()
|
||||||
|
.ok(),
|
||||||
location: PathBuf::from(""),
|
location: PathBuf::from(""),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -38,7 +45,11 @@ impl Default for Downloader {
|
||||||
impl Downloader {
|
impl Downloader {
|
||||||
pub fn new(location: PathBuf) -> Self {
|
pub fn new(location: PathBuf) -> Self {
|
||||||
Self {
|
Self {
|
||||||
client: surf::client().with(surf::middleware::Redirect::default()),
|
client: HttpClient::builder()
|
||||||
|
// TODO: timeout?
|
||||||
|
.redirect_policy(RedirectPolicy::Follow)
|
||||||
|
.build()
|
||||||
|
.ok(),
|
||||||
location,
|
location,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -48,17 +59,19 @@ impl Downloader {
|
||||||
url: &str,
|
url: &str,
|
||||||
file_name: Option<&str>,
|
file_name: Option<&str>,
|
||||||
) -> Result<(), DownloaderError> {
|
) -> Result<(), DownloaderError> {
|
||||||
|
let Some(client) = &self.client else {
|
||||||
|
log::error!("No Http client found, cannot perform requests - this is likely a misconfiguration of isahc::HttpClient");
|
||||||
|
return Ok(());
|
||||||
|
};
|
||||||
let file_name = match file_name {
|
let file_name = match file_name {
|
||||||
Some(name) => name.to_string(),
|
Some(name) => name.to_string(),
|
||||||
None => self.parse_name(url)?,
|
None => self.parse_name(url)?,
|
||||||
};
|
};
|
||||||
|
|
||||||
let file_path = self.location.join(file_name.as_str());
|
let file_path = self.location.join(file_name.as_str());
|
||||||
if file_path.exists() {
|
if file_path.exists() {
|
||||||
log::debug!("File already exists: {:?}", file_path);
|
log::debug!("File already exists: {:?}", file_path);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
let file_part_path = self.location.join(format!("{}.part", file_name));
|
let file_part_path = self.location.join(format!("{}.part", file_name));
|
||||||
let (mut target, file_part_size) = {
|
let (mut target, file_part_size) = {
|
||||||
if file_part_path.exists() {
|
if file_part_path.exists() {
|
||||||
|
|
@ -86,16 +99,13 @@ impl Downloader {
|
||||||
(file_part, 0)
|
(file_part, 0)
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
let request = Request::get(url)
|
||||||
let res = self
|
|
||||||
.client
|
|
||||||
.get(url)
|
|
||||||
.header("Content-Type", "application/octet-stream")
|
.header("Content-Type", "application/octet-stream")
|
||||||
.header("Range", format!("bytes={}-", file_part_size))
|
.header("Range", format!("bytes={}-", file_part_size))
|
||||||
.await
|
.body(())?;
|
||||||
.map_err(|e| DownloaderError::Request(e))?;
|
let mut res = client.send_async(request).await?;
|
||||||
|
let body = res.body_mut();
|
||||||
let mut stream = res.bytes();
|
let mut stream = body.bytes();
|
||||||
while let Some(byte) = stream.next().await {
|
while let Some(byte) = stream.next().await {
|
||||||
let byte = byte.map_err(|e| DownloaderError::Io(e))?;
|
let byte = byte.map_err(|e| DownloaderError::Io(e))?;
|
||||||
target
|
target
|
||||||
|
|
@ -113,17 +123,19 @@ impl Downloader {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
pub async fn download_without_cache(url: &str) -> Result<String, DownloaderError> {
|
pub async fn download_without_cache(url: &str) -> Result<String, DownloaderError> {
|
||||||
// result is the stringified body
|
let request = Request::get(url)
|
||||||
let client = surf::client().with(surf::middleware::Redirect::default());
|
|
||||||
|
|
||||||
let res = client
|
|
||||||
.get(url)
|
|
||||||
.header("Content-Type", "application/octet-stream")
|
.header("Content-Type", "application/octet-stream")
|
||||||
.await
|
.body(())?;
|
||||||
.map_err(|e| DownloaderError::Request(e))?;
|
let client = HttpClient::builder()
|
||||||
|
// TODO: timeout?
|
||||||
|
.redirect_policy(RedirectPolicy::Follow)
|
||||||
|
.build()?;
|
||||||
|
|
||||||
|
let mut res = client.send_async(request).await?;
|
||||||
|
|
||||||
let mut downloaded_bytes: Vec<u8> = vec![];
|
let mut downloaded_bytes: Vec<u8> = vec![];
|
||||||
let mut stream = res.bytes();
|
let body = res.body_mut();
|
||||||
|
let mut stream = body.bytes();
|
||||||
while let Some(byte) = stream.next().await {
|
while let Some(byte) = stream.next().await {
|
||||||
let byte = byte.map_err(|e| DownloaderError::Io(e))?;
|
let byte = byte.map_err(|e| DownloaderError::Io(e))?;
|
||||||
downloaded_bytes.push(byte);
|
downloaded_bytes.push(byte);
|
||||||
|
|
|
||||||
|
|
@ -25,9 +25,9 @@ pub mod logging; // Requires log4rs
|
||||||
|
|
||||||
#[cfg(not(target_family = "wasm"))]
|
#[cfg(not(target_family = "wasm"))]
|
||||||
pub use ::{
|
pub use ::{
|
||||||
anyhow, async_channel, async_std, clap, common_path, humantime, interprocess, lazy_static,
|
anyhow, async_channel, async_std, clap, common_path, humantime, interprocess, isahc,
|
||||||
libc, miette, nix, notify_debouncer_full, regex, serde, signal_hook, surf, tempfile, termwiz,
|
lazy_static, libc, miette, nix, notify_debouncer_full, regex, serde, signal_hook, tempfile,
|
||||||
url, uuid, vte,
|
termwiz, url, uuid, vte,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub use ::prost;
|
pub use ::prost;
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue