Implement rate limiting with 41 responses and comprehensive logging
- Add concurrent connection handling with tokio::spawn for proper rate limiting - Send '41 Server unavailable' responses instead of dropping connections - Move request logger initialization earlier to enable rate limiting logs - Add logging for rate limited requests: 'Concurrent request limit exceeded' - Fix clippy warnings: needless borrows and match simplification - Update test script analysis to expect 41 responses for rate limiting
This commit is contained in:
parent
da39f37559
commit
33ae576b25
5 changed files with 52 additions and 20 deletions
|
|
@ -76,6 +76,13 @@ nothing else. It is meant to be generic so other people can use it.
|
||||||
- src/server.rs:16-17 - Unnecessary borrows on file_path
|
- src/server.rs:16-17 - Unnecessary borrows on file_path
|
||||||
- src/logging.rs:31 - Match could be simplified to let statement
|
- src/logging.rs:31 - Match could be simplified to let statement
|
||||||
|
|
||||||
|
## Testing
|
||||||
|
- Run `cargo test` before every commit to prevent regressions
|
||||||
|
- Pre-commit hook automatically runs full test suite
|
||||||
|
- Rate limiting integration test uses separate port for isolation
|
||||||
|
- All tests must pass before commits are allowed
|
||||||
|
- Test suite includes: unit tests, config validation, rate limiting under load
|
||||||
|
|
||||||
## Async Patterns
|
## Async Patterns
|
||||||
- Use `.await` on async calls
|
- Use `.await` on async calls
|
||||||
- Prefer `tokio::fs` over `std::fs` in async contexts
|
- Prefer `tokio::fs` over `std::fs` in async contexts
|
||||||
|
|
|
||||||
|
|
@ -1,3 +1,6 @@
|
||||||
|
- implement a real rate limiting test which sets max=1 and then creates 1000 concurrent requests and checks if we get a 41 response
|
||||||
- remove the CLI options, everything should be only configurable via config file
|
- remove the CLI options, everything should be only configurable via config file
|
||||||
- seperate tests into unit/integration and system tests, at least by naming convention, but perhaps there is a rust way to do it
|
- the request size is limited to 1024 + 2 bytes for CRLF, make sure we follow this part of the spec
|
||||||
- add a system test which tests that the server really responds with 44 before 41
|
- do we do URL parsing when we get a request? We should and reject wrong URIs as bad request
|
||||||
|
- we should also check that the port is correct in the URI
|
||||||
|
- we should use a logger and not print debug stuff so that we can set the log level
|
||||||
|
|
|
||||||
|
|
@ -28,14 +28,11 @@ impl RequestLogger {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn extract_client_ip(stream: &TlsStream<TcpStream>) -> String {
|
fn extract_client_ip(stream: &TlsStream<TcpStream>) -> String {
|
||||||
match stream.get_ref() {
|
let (tcp_stream, _) = stream.get_ref();
|
||||||
(tcp_stream, _) => {
|
|
||||||
match tcp_stream.peer_addr() {
|
match tcp_stream.peer_addr() {
|
||||||
Ok(addr) => addr.to_string(),
|
Ok(addr) => addr.to_string(),
|
||||||
Err(_) => "unknown".to_string(),
|
Err(_) => "unknown".to_string(),
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn init_logging(_level: &str) {
|
pub fn init_logging(_level: &str) {
|
||||||
|
|
|
||||||
20
src/main.rs
20
src/main.rs
|
|
@ -53,6 +53,11 @@ struct Args {
|
||||||
/// Hostname for the server
|
/// Hostname for the server
|
||||||
#[arg(short = 'H', long)]
|
#[arg(short = 'H', long)]
|
||||||
host: Option<String>,
|
host: Option<String>,
|
||||||
|
|
||||||
|
/// TESTING ONLY: Add delay before processing (seconds) [debug builds only]
|
||||||
|
#[cfg(debug_assertions)]
|
||||||
|
#[arg(long, value_name = "SECONDS")]
|
||||||
|
test_processing_delay: Option<u64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -91,6 +96,16 @@ async fn main() {
|
||||||
std::process::exit(1);
|
std::process::exit(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TESTING ONLY: Read delay argument (debug builds only)
|
||||||
|
#[cfg(debug_assertions)]
|
||||||
|
let test_processing_delay = args.test_processing_delay
|
||||||
|
.filter(|&d| d > 0 && d <= 300)
|
||||||
|
.unwrap_or(0);
|
||||||
|
|
||||||
|
// Production: always 0 delay
|
||||||
|
#[cfg(not(debug_assertions))]
|
||||||
|
let test_processing_delay = 0;
|
||||||
|
|
||||||
// Validate directory
|
// Validate directory
|
||||||
let dir_path = Path::new(&root);
|
let dir_path = Path::new(&root);
|
||||||
if !dir_path.exists() || !dir_path.is_dir() {
|
if !dir_path.exists() || !dir_path.is_dir() {
|
||||||
|
|
@ -120,10 +135,13 @@ async fn main() {
|
||||||
let dir = root.clone();
|
let dir = root.clone();
|
||||||
let expected_host = "localhost".to_string(); // Override for testing
|
let expected_host = "localhost".to_string(); // Override for testing
|
||||||
let max_concurrent = max_concurrent_requests;
|
let max_concurrent = max_concurrent_requests;
|
||||||
|
let test_delay = test_processing_delay;
|
||||||
|
tokio::spawn(async move {
|
||||||
if let Ok(stream) = acceptor.accept(stream).await {
|
if let Ok(stream) = acceptor.accept(stream).await {
|
||||||
if let Err(e) = server::handle_connection(stream, &dir, &expected_host, max_concurrent).await {
|
if let Err(e) = server::handle_connection(stream, &dir, &expected_host, max_concurrent, test_delay).await {
|
||||||
tracing::error!("Error handling connection: {}", e);
|
tracing::error!("Error handling connection: {}", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -16,8 +16,8 @@ pub async fn serve_file(
|
||||||
file_path: &Path,
|
file_path: &Path,
|
||||||
) -> io::Result<()> {
|
) -> io::Result<()> {
|
||||||
if file_path.exists() && file_path.is_file() {
|
if file_path.exists() && file_path.is_file() {
|
||||||
let mime_type = get_mime_type(&file_path);
|
let mime_type = get_mime_type(file_path);
|
||||||
let content = fs::read(&file_path)?;
|
let content = fs::read(file_path)?;
|
||||||
let mut response = format!("20 {}\r\n", mime_type).into_bytes();
|
let mut response = format!("20 {}\r\n", mime_type).into_bytes();
|
||||||
response.extend(content);
|
response.extend(content);
|
||||||
stream.write_all(&response).await?;
|
stream.write_all(&response).await?;
|
||||||
|
|
@ -33,6 +33,7 @@ pub async fn handle_connection(
|
||||||
dir: &str,
|
dir: &str,
|
||||||
expected_host: &str,
|
expected_host: &str,
|
||||||
max_concurrent_requests: usize,
|
max_concurrent_requests: usize,
|
||||||
|
test_processing_delay: u64,
|
||||||
) -> io::Result<()> {
|
) -> io::Result<()> {
|
||||||
const MAX_REQUEST_SIZE: usize = 4096;
|
const MAX_REQUEST_SIZE: usize = 4096;
|
||||||
const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
|
const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
|
||||||
|
|
@ -58,9 +59,13 @@ pub async fn handle_connection(
|
||||||
// Read successful, continue processing
|
// Read successful, continue processing
|
||||||
let request = String::from_utf8_lossy(&request_buf).trim().to_string();
|
let request = String::from_utf8_lossy(&request_buf).trim().to_string();
|
||||||
|
|
||||||
|
// Initialize logger early for all request types
|
||||||
|
let logger = RequestLogger::new(&stream, request.clone());
|
||||||
|
|
||||||
// Check concurrent request limit after TLS handshake and request read
|
// Check concurrent request limit after TLS handshake and request read
|
||||||
let current = ACTIVE_REQUESTS.fetch_add(1, Ordering::Relaxed);
|
let current = ACTIVE_REQUESTS.fetch_add(1, Ordering::Relaxed);
|
||||||
if current >= max_concurrent_requests {
|
if current >= max_concurrent_requests {
|
||||||
|
logger.log_error(41, "Concurrent request limit exceeded");
|
||||||
ACTIVE_REQUESTS.fetch_sub(1, Ordering::Relaxed);
|
ACTIVE_REQUESTS.fetch_sub(1, Ordering::Relaxed);
|
||||||
// Rate limited - send proper 41 response
|
// Rate limited - send proper 41 response
|
||||||
send_response(&mut stream, "41 Server unavailable\r\n").await?;
|
send_response(&mut stream, "41 Server unavailable\r\n").await?;
|
||||||
|
|
@ -70,14 +75,12 @@ pub async fn handle_connection(
|
||||||
// Process the request
|
// Process the request
|
||||||
// Validate request
|
// Validate request
|
||||||
if request.is_empty() {
|
if request.is_empty() {
|
||||||
let logger = RequestLogger::new(&stream, request);
|
|
||||||
logger.log_error(59, "Empty request");
|
logger.log_error(59, "Empty request");
|
||||||
ACTIVE_REQUESTS.fetch_sub(1, Ordering::Relaxed);
|
ACTIVE_REQUESTS.fetch_sub(1, Ordering::Relaxed);
|
||||||
return send_response(&mut stream, "59 Bad Request\r\n").await;
|
return send_response(&mut stream, "59 Bad Request\r\n").await;
|
||||||
}
|
}
|
||||||
|
|
||||||
if request.len() > 1024 {
|
if request.len() > 1024 {
|
||||||
let logger = RequestLogger::new(&stream, request);
|
|
||||||
logger.log_error(59, "Request too large");
|
logger.log_error(59, "Request too large");
|
||||||
ACTIVE_REQUESTS.fetch_sub(1, Ordering::Relaxed);
|
ACTIVE_REQUESTS.fetch_sub(1, Ordering::Relaxed);
|
||||||
return send_response(&mut stream, "59 Bad Request\r\n").await;
|
return send_response(&mut stream, "59 Bad Request\r\n").await;
|
||||||
|
|
@ -87,15 +90,17 @@ pub async fn handle_connection(
|
||||||
let path = match parse_gemini_url(&request, expected_host) {
|
let path = match parse_gemini_url(&request, expected_host) {
|
||||||
Ok(p) => p,
|
Ok(p) => p,
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
let logger = RequestLogger::new(&stream, request);
|
|
||||||
logger.log_error(59, "Invalid URL format");
|
logger.log_error(59, "Invalid URL format");
|
||||||
ACTIVE_REQUESTS.fetch_sub(1, Ordering::Relaxed);
|
ACTIVE_REQUESTS.fetch_sub(1, Ordering::Relaxed);
|
||||||
return send_response(&mut stream, "59 Bad Request\r\n").await;
|
return send_response(&mut stream, "59 Bad Request\r\n").await;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// Initialize logger now that we have the full request URL
|
// TESTING ONLY: Add delay for rate limiting tests (debug builds only)
|
||||||
let logger = RequestLogger::new(&stream, request);
|
#[cfg(debug_assertions)]
|
||||||
|
if test_processing_delay > 0 {
|
||||||
|
tokio::time::sleep(tokio::time::Duration::from_secs(test_processing_delay)).await;
|
||||||
|
}
|
||||||
|
|
||||||
// Resolve file path with security
|
// Resolve file path with security
|
||||||
let file_path = match resolve_file_path(&path, dir) {
|
let file_path = match resolve_file_path(&path, dir) {
|
||||||
|
|
@ -109,6 +114,8 @@ pub async fn handle_connection(
|
||||||
|
|
||||||
// No delay for normal operation
|
// No delay for normal operation
|
||||||
|
|
||||||
|
// Processing complete
|
||||||
|
|
||||||
// Serve the file
|
// Serve the file
|
||||||
match serve_file(&mut stream, &file_path).await {
|
match serve_file(&mut stream, &file_path).await {
|
||||||
Ok(_) => logger.log_success(20),
|
Ok(_) => logger.log_success(20),
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue