Add configurable global concurrent request limiting

- Add max_concurrent_requests config option (default: 1000)
- Implement global AtomicUsize counter for concurrent request tracking
- Return status 41 'Server unavailable' when limit exceeded
- Proper counter management with decrements on all exit paths
- Add comprehensive config validation (1-1,000,000 range)
- Update documentation with rate limiting details
- Add unit tests for config parsing
- Thread-safe implementation using Ordering::Relaxed

This provides effective DDoS protection by limiting concurrent
connections to prevent server overload while maintaining
configurability for different deployment scenarios.
This commit is contained in:
Jeena 2026-01-16 02:26:59 +00:00
parent 9d29321806
commit 0468781a69
5 changed files with 54 additions and 6 deletions

View file

@ -3,11 +3,14 @@ use crate::logging::RequestLogger;
use std::fs;
use std::io;
use std::path::Path;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::time::{timeout, Duration};
use tokio_rustls::server::TlsStream;
static ACTIVE_REQUESTS: AtomicUsize = AtomicUsize::new(0);
pub async fn serve_file(
stream: &mut TlsStream<TcpStream>,
file_path: &Path,
@ -29,7 +32,15 @@ pub async fn handle_connection(
mut stream: TlsStream<TcpStream>,
dir: &str,
expected_host: &str,
max_concurrent_requests: usize,
) -> io::Result<()> {
// Check concurrent request limit
let current = ACTIVE_REQUESTS.fetch_add(1, Ordering::Relaxed);
if current >= max_concurrent_requests {
ACTIVE_REQUESTS.fetch_sub(1, Ordering::Relaxed);
return send_response(&mut stream, "41 Server unavailable\r\n").await;
}
const MAX_REQUEST_SIZE: usize = 4096;
const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
@ -58,12 +69,14 @@ pub async fn handle_connection(
if request.is_empty() {
let logger = RequestLogger::new(&stream, request);
logger.log_error(59, "Empty request");
ACTIVE_REQUESTS.fetch_sub(1, Ordering::Relaxed);
return send_response(&mut stream, "59 Bad Request\r\n").await;
}
if request.len() > 1024 {
let logger = RequestLogger::new(&stream, request);
logger.log_error(59, "Request too large");
ACTIVE_REQUESTS.fetch_sub(1, Ordering::Relaxed);
return send_response(&mut stream, "59 Bad Request\r\n").await;
}
@ -73,6 +86,7 @@ pub async fn handle_connection(
Err(_) => {
let logger = RequestLogger::new(&stream, request);
logger.log_error(59, "Invalid URL format");
ACTIVE_REQUESTS.fetch_sub(1, Ordering::Relaxed);
return send_response(&mut stream, "59 Bad Request\r\n").await;
}
};
@ -85,6 +99,7 @@ pub async fn handle_connection(
Ok(fp) => fp,
Err(PathResolutionError::NotFound) => {
logger.log_error(51, "File not found");
ACTIVE_REQUESTS.fetch_sub(1, Ordering::Relaxed);
return send_response(&mut stream, "51 Not found\r\n").await;
}
};
@ -95,7 +110,7 @@ pub async fn handle_connection(
Err(_) => {
// This shouldn't happen since we check existence, but handle gracefully
logger.log_error(51, "File not found");
send_response(&mut stream, "51 Not found\r\n").await?;
let _ = send_response(&mut stream, "51 Not found\r\n").await;
}
}
},
@ -103,7 +118,7 @@ pub async fn handle_connection(
// Read failed, check error type
let request_str = String::from_utf8_lossy(&request_buf).trim().to_string();
let logger = RequestLogger::new(&stream, request_str);
match e.kind() {
tokio::io::ErrorKind::InvalidData => {
logger.log_error(59, "Request too large");
@ -114,6 +129,7 @@ pub async fn handle_connection(
let _ = send_response(&mut stream, "59 Bad Request\r\n").await;
}
}
ACTIVE_REQUESTS.fetch_sub(1, Ordering::Relaxed);
},
Err(_) => {
// Timeout
@ -121,10 +137,12 @@ pub async fn handle_connection(
let logger = RequestLogger::new(&stream, request_str);
logger.log_error(41, "Server unavailable");
let _ = send_response(&mut stream, "41 Server unavailable\r\n").await;
ACTIVE_REQUESTS.fetch_sub(1, Ordering::Relaxed);
return Ok(());
}
}
ACTIVE_REQUESTS.fetch_sub(1, Ordering::Relaxed);
Ok(())
}