#!/usr/bin/env python3 """ Gemini Test Client A simple Gemini protocol client for testing Gemini servers. Used by integration tests to validate server behavior. Usage: python3 tests/gemini_test_client.py --url gemini://example.com/ --timeout 10 """ import argparse import socket import ssl import time import multiprocessing from concurrent.futures import ProcessPoolExecutor, as_completed def parse_args(): """Parse command line arguments""" parser = argparse.ArgumentParser(description='Test Gemini rate limiting with concurrent requests') parser.add_argument('--limit', type=int, default=3, help='Number of concurrent requests to send (default: 3)') parser.add_argument('--host', default='localhost', help='Server host (default: localhost)') parser.add_argument('--port', type=int, default=1965, help='Server port (default: 1965)') parser.add_argument('--delay', type=float, default=0.1, help='Delay between request start and connection close (default: 0.1s)') parser.add_argument('--timeout', type=float, default=5.0, help='Socket timeout in seconds (default: 5.0)') parser.add_argument('--url', default='gemini://localhost/big-file.mkv', help='Gemini URL to request (default: gemini://localhost/big-file.mkv)') args = parser.parse_args() # Validation if args.limit < 1: parser.error("Limit must be at least 1") if args.limit > 10000: parser.error("Limit too high (max 10000 for safety)") if args.delay < 0: parser.error("Delay must be non-negative") if args.timeout <= 0: parser.error("Timeout must be positive") return args def send_gemini_request(host, port, url, delay, timeout): """Send one Gemini request with proper error handling""" try: # Create SSL context context = ssl.create_default_context() context.check_hostname = False context.verify_mode = ssl.CERT_NONE # Connect with timeout sock = socket.create_connection((host, port), timeout=timeout) ssl_sock = context.wrap_socket(sock, server_hostname=host) # Send request request = f"{url}\r\n".encode('utf-8') ssl_sock.send(request) # Read response with timeout ssl_sock.settimeout(timeout) response = ssl_sock.recv(1024) if not response: return "Error: Empty response" status = response.decode('utf-8', errors='ignore').split('\r\n')[0] # Keep connection alive briefly if requested if delay > 0: time.sleep(delay) ssl_sock.close() return status except socket.timeout: return "Error: Timeout" except ConnectionRefusedError: return "Error: Connection refused" except Exception as e: return f"Error: {e}" def main(): """Run concurrent requests""" args = parse_args() if args.limit == 1: print("Testing single request (debug mode)...") start_time = time.time() result = send_gemini_request(args.host, args.port, args.url, args.delay, args.timeout) end_time = time.time() duration = end_time - start_time print(f"Result: {result}") print(".2f") return print(f"Testing rate limiting with {args.limit} concurrent requests (using multiprocessing)...") print(f"Server: {args.host}:{args.port}") print(f"URL: {args.url}") print(f"Delay: {args.delay}s, Timeout: {args.timeout}s") print() start_time = time.time() # Use ProcessPoolExecutor for true parallelism (bypasses GIL) results = [] max_workers = min(args.limit, multiprocessing.cpu_count() * 4) # Limit workers to avoid system overload with ProcessPoolExecutor(max_workers=max_workers) as executor: futures = [ executor.submit(send_gemini_request, args.host, args.port, args.url, args.delay, args.timeout) for _ in range(args.limit) ] for future in as_completed(futures): results.append(future.result()) elapsed = time.time() - start_time # Analyze results status_counts = {} connection_refused = 0 timeouts = 0 other_errors = [] for result in results: if "Connection refused" in result: connection_refused += 1 elif "Timeout" in result: timeouts += 1 elif result.startswith("Error"): other_errors.append(result) else: status_counts[result] = status_counts.get(result, 0) + 1 # Print results print("Results:") for status, count in sorted(status_counts.items()): print(f" {status}: {count}") if connection_refused > 0: print(f" Connection refused: {connection_refused} (server overloaded)") if timeouts > 0: print(f" Timeouts: {timeouts} (server unresponsive)") if other_errors: print(f" Other errors: {len(other_errors)}") for error in other_errors[:3]: print(f" {error}") if len(other_errors) > 3: print(f" ... and {len(other_errors) - 3} more") print() print(".2f") # Success criteria for rate limiting success_20 = status_counts.get("20 application/octet-stream", 0) rate_limited_41 = status_counts.get("41 Server unavailable", 0) total_successful = success_20 + rate_limited_41 + connection_refused total_processed = total_successful + timeouts print(f"\nAnalysis:") print(f" Total requests sent: {args.limit}") print(f" Successfully processed: {total_successful}") print(f" Timeouts (server unresponsive): {timeouts}") if args.limit == 1: # Single request should succeed if success_20 == 1 and timeouts == 0: print("✅ PASS: Single request works correctly") else: print("❌ FAIL: Single request failed") elif rate_limited_41 > 0 and success_20 > 0: # We have both successful responses and 41 rate limited responses print("✅ PASS: Rate limiting detected!") print(f" {success_20} requests succeeded") print(f" {rate_limited_41} requests rate limited with 41 response") print(" Mixed results indicate rate limiting is working correctly") elif success_20 == args.limit and timeouts == 0: # All requests succeeded print("⚠️ All requests succeeded - rate limiting may not be triggered") print(" This could mean:") print(" - Requests are not truly concurrent") print(" - Processing is too fast for overlap") print(" - Need longer delays or more concurrent requests") else: print("❓ UNCLEAR: Check server logs and test parameters") print(" May need to adjust --limit, delays, or server configuration") if __name__ == '__main__': main()