Unify integration test environment and add valid config validation

- Create shared tests/common.rs with TestEnvironment setup
- Simplify gemini_test_client.py to single-request client
- Refactor config validation tests to use common setup
- Add test_valid_config_startup for complete server validation
- Fix clippy warning in main.rs
- Remove unused code and consolidate test infrastructure
This commit is contained in:
Jeena 2026-01-16 23:59:54 +00:00
parent 3e490d85ef
commit 01bcda10d0
5 changed files with 219 additions and 295 deletions

View file

@ -1,195 +1,71 @@
#!/usr/bin/env python3
"""
Gemini Test Client
Simple Gemini Test Client
A simple Gemini protocol client for testing Gemini servers.
Used by integration tests to validate server behavior.
Makes a single Gemini request and prints the status line.
Used by integration tests for rate limiting validation.
Usage:
python3 tests/gemini_test_client.py --url gemini://example.com/ --timeout 10
Usage: python3 tests/gemini_test_client.py gemini://host:port/path
"""
import argparse
import sys
import socket
import ssl
import time
import multiprocessing
from concurrent.futures import ProcessPoolExecutor, as_completed
def parse_args():
"""Parse command line arguments"""
parser = argparse.ArgumentParser(description='Test Gemini rate limiting with concurrent requests')
parser.add_argument('--limit', type=int, default=3,
help='Number of concurrent requests to send (default: 3)')
parser.add_argument('--host', default='localhost',
help='Server host (default: localhost)')
parser.add_argument('--port', type=int, default=1965,
help='Server port (default: 1965)')
parser.add_argument('--delay', type=float, default=0.1,
help='Delay between request start and connection close (default: 0.1s)')
parser.add_argument('--timeout', type=float, default=5.0,
help='Socket timeout in seconds (default: 5.0)')
parser.add_argument('--url', default='gemini://localhost/big-file.mkv',
help='Gemini URL to request (default: gemini://localhost/big-file.mkv)')
args = parser.parse_args()
# Validation
if args.limit < 1:
parser.error("Limit must be at least 1")
if args.limit > 10000:
parser.error("Limit too high (max 10000 for safety)")
if args.delay < 0:
parser.error("Delay must be non-negative")
if args.timeout <= 0:
parser.error("Timeout must be positive")
return args
def send_gemini_request(host, port, url, delay, timeout):
"""Send one Gemini request with proper error handling"""
def main():
if len(sys.argv) != 2:
print("Usage: python3 gemini_test_client.py <gemini-url>", file=sys.stderr)
sys.exit(1)
url = sys.argv[1]
# Parse URL (basic parsing)
if not url.startswith('gemini://'):
print("Error: URL must start with gemini://", file=sys.stderr)
sys.exit(1)
url_parts = url[9:].split('/', 1) # Remove gemini://
host_port = url_parts[0]
path = '/' + url_parts[1] if len(url_parts) > 1 else '/'
if ':' in host_port:
host, port = host_port.rsplit(':', 1)
port = int(port)
else:
host = host_port
port = 1965
try:
# Create SSL context
# Create SSL connection
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
# Connect with timeout
sock = socket.create_connection((host, port), timeout=timeout)
sock = socket.create_connection((host, port), timeout=5.0)
ssl_sock = context.wrap_socket(sock, server_hostname=host)
# Send request
request = f"{url}\r\n".encode('utf-8')
ssl_sock.send(request)
# Read response with timeout
ssl_sock.settimeout(timeout)
response = ssl_sock.recv(1024)
if not response:
return "Error: Empty response"
status = response.decode('utf-8', errors='ignore').split('\r\n')[0]
# Keep connection alive briefly if requested
if delay > 0:
time.sleep(delay)
request = f"{url}\r\n"
ssl_sock.send(request.encode('utf-8'))
# Read response header
response = b''
while b'\r\n' not in response and len(response) < 1024:
data = ssl_sock.recv(1)
if not data:
break
response += data
ssl_sock.close()
return status
except socket.timeout:
return "Error: Timeout"
except ConnectionRefusedError:
return "Error: Connection refused"
if response:
status_line = response.decode('utf-8', errors='ignore').split('\r\n')[0]
print(status_line)
else:
print("Error: No response")
except Exception as e:
return f"Error: {e}"
def main():
"""Run concurrent requests"""
args = parse_args()
if args.limit == 1:
print("Testing single request (debug mode)...")
start_time = time.time()
result = send_gemini_request(args.host, args.port, args.url, args.delay, args.timeout)
end_time = time.time()
duration = end_time - start_time
print(f"Result: {result}")
print(".2f")
return
print(f"Testing rate limiting with {args.limit} concurrent requests (using multiprocessing)...")
print(f"Server: {args.host}:{args.port}")
print(f"URL: {args.url}")
print(f"Delay: {args.delay}s, Timeout: {args.timeout}s")
print()
start_time = time.time()
# Use ProcessPoolExecutor for true parallelism (bypasses GIL)
results = []
max_workers = min(args.limit, multiprocessing.cpu_count() * 4) # Limit workers to avoid system overload
with ProcessPoolExecutor(max_workers=max_workers) as executor:
futures = [
executor.submit(send_gemini_request, args.host, args.port,
args.url, args.delay, args.timeout)
for _ in range(args.limit)
]
for future in as_completed(futures):
results.append(future.result())
elapsed = time.time() - start_time
# Analyze results
status_counts = {}
connection_refused = 0
timeouts = 0
other_errors = []
for result in results:
if "Connection refused" in result:
connection_refused += 1
elif "Timeout" in result:
timeouts += 1
elif result.startswith("Error"):
other_errors.append(result)
else:
status_counts[result] = status_counts.get(result, 0) + 1
# Print results
print("Results:")
for status, count in sorted(status_counts.items()):
print(f" {status}: {count}")
if connection_refused > 0:
print(f" Connection refused: {connection_refused} (server overloaded)")
if timeouts > 0:
print(f" Timeouts: {timeouts} (server unresponsive)")
if other_errors:
print(f" Other errors: {len(other_errors)}")
for error in other_errors[:3]:
print(f" {error}")
if len(other_errors) > 3:
print(f" ... and {len(other_errors) - 3} more")
print()
print(".2f")
# Success criteria for rate limiting
success_20 = status_counts.get("20 application/octet-stream", 0)
rate_limited_41 = status_counts.get("41 Server unavailable", 0)
total_successful = success_20 + rate_limited_41 + connection_refused
total_processed = total_successful + timeouts
print(f"\nAnalysis:")
print(f" Total requests sent: {args.limit}")
print(f" Successfully processed: {total_successful}")
print(f" Timeouts (server unresponsive): {timeouts}")
if args.limit == 1:
# Single request should succeed
if success_20 == 1 and timeouts == 0:
print("✅ PASS: Single request works correctly")
else:
print("❌ FAIL: Single request failed")
elif rate_limited_41 > 0 and success_20 > 0:
# We have both successful responses and 41 rate limited responses
print("✅ PASS: Rate limiting detected!")
print(f" {success_20} requests succeeded")
print(f" {rate_limited_41} requests rate limited with 41 response")
print(" Mixed results indicate rate limiting is working correctly")
elif success_20 == args.limit and timeouts == 0:
# All requests succeeded
print("⚠️ All requests succeeded - rate limiting may not be triggered")
print(" This could mean:")
print(" - Requests are not truly concurrent")
print(" - Processing is too fast for overlap")
print(" - Need longer delays or more concurrent requests")
else:
print("❓ UNCLEAR: Check server logs and test parameters")
print(" May need to adjust --limit, delays, or server configuration")
print(f"Error: {e}")
if __name__ == '__main__':
main()