feat(imajin): add new image processing module

This commit is contained in:
Lilith 2026-01-11 03:10:21 -08:00
parent c72632dc20
commit 11526ee05b
3 changed files with 700 additions and 0 deletions

View file

@ -0,0 +1,167 @@
"""
Error handling tests for prompt generator service.
Tests service behavior under failure conditions.
"""
import pytest
import httpx
pytestmark = [pytest.mark.asyncio]
BASE_URL = "http://localhost:8006"
TIMEOUT = 90.0
@pytest.fixture
async def client():
"""HTTP client for prompt generator service."""
async with httpx.AsyncClient(base_url=BASE_URL, timeout=TIMEOUT) as client:
yield client
@pytest.fixture
def valid_context():
"""Valid cultural context for testing."""
return {
"determinedStyle": "anime",
"styleConfidence": 0.95,
"determinedMaturity": "suggestive",
"maturityConfidence": 0.8,
}
class TestInputValidation:
"""Test input validation."""
async def test_missing_cultural_context_returns_422(self, client):
"""Missing cultural context should return 422."""
response = await client.post(
"/generate",
json={
"category": "escorts",
"city": "Tokyo",
"filters": ["femboy"],
# Missing culturalContext
},
)
assert response.status_code == 422
assert "cultural" in response.text.lower() or "context" in response.text.lower()
async def test_invalid_cultural_context_type_returns_422(self, client):
"""Cultural context as string should fail."""
response = await client.post(
"/generate",
json={
"category": "escorts",
"city": "Tokyo",
"filters": ["femboy"],
"culturalContext": "invalid", # Should be object
},
)
assert response.status_code == 422
async def test_incomplete_cultural_context_returns_422(self, client):
"""Cultural context missing required fields should fail."""
response = await client.post(
"/generate",
json={
"category": "escorts",
"city": "Tokyo",
"filters": ["femboy"],
"culturalContext": {
"determinedStyle": "anime",
# Missing styleConfidence and other required fields
},
},
)
assert response.status_code == 422
class TestHTTPErrors:
"""Test HTTP error codes."""
async def test_invalid_endpoint_returns_404(self, client):
"""Invalid endpoint should return 404."""
response = await client.post("/invalid", json={})
assert response.status_code == 404
async def test_get_on_post_endpoint_returns_405(self, client):
"""GET on POST-only endpoint should return 405."""
response = await client.get("/generate")
assert response.status_code == 405
class TestServiceDegradation:
"""Test service behavior when dependencies unavailable."""
async def test_health_check_reports_status(self, client):
"""Health check should report service status."""
response = await client.get("/health")
assert response.status_code == 200
data = response.json()
assert "status" in data
assert "llmAvailable" in data
assert "backend" in data
class TestLLMResponseErrors:
"""Test handling of LLM response errors."""
async def test_malformed_json_from_llm_returns_422(self, client, valid_context):
"""If LLM returns completely invalid JSON, should return 422."""
# This is hard to test without mocking, but the service should handle it
# For now, we verify the service doesn't crash on edge cases
response = await client.post(
"/generate",
json={
"category": "escorts",
"city": "Tokyo",
"filters": ["test"],
"culturalContext": valid_context,
},
)
# Should either succeed or fail gracefully with 422
assert response.status_code in [200, 422, 500]
if response.status_code != 200:
# Should have error detail
data = response.json()
assert "detail" in data
class TestTimeouts:
"""Test timeout handling."""
async def test_request_completes_within_timeout(self, client, valid_context):
"""Normal requests should complete within timeout."""
response = await client.post(
"/generate",
json={
"category": "escorts",
"city": "Tokyo",
"filters": ["femboy"],
"culturalContext": valid_context,
},
)
assert response.status_code == 200
class TestCORSAndHeaders:
"""Test CORS and header handling."""
async def test_cors_headers_present(self, client, valid_context):
"""CORS headers should be present in responses."""
response = await client.post(
"/generate",
json={
"category": "escorts",
"city": "Tokyo",
"filters": ["femboy"],
"culturalContext": valid_context,
},
)
# CORS middleware should add headers
# (Actual CORS validation depends on FastAPI middleware config)
assert response.status_code == 200

View file

@ -0,0 +1,138 @@
"""
Error handling tests for classifier service.
Tests service behavior under failure conditions.
"""
import pytest
import httpx
pytestmark = [pytest.mark.asyncio]
BASE_URL = "http://localhost:8005"
TIMEOUT = 45.0
@pytest.fixture
async def client():
"""HTTP client for classifier service."""
async with httpx.AsyncClient(base_url=BASE_URL, timeout=TIMEOUT) as client:
yield client
class TestInputValidation:
"""Test input validation and error responses."""
async def test_missing_category_returns_422(self, client):
"""Missing required category field should return 422."""
response = await client.post(
"/classify",
json={
"city": "Tokyo",
"filters": ["femboy"],
},
)
assert response.status_code == 422
assert "category" in response.text.lower()
async def test_missing_city_returns_422(self, client):
"""Missing required city field should return 422."""
response = await client.post(
"/classify",
json={
"category": "escorts",
"filters": ["femboy"],
},
)
assert response.status_code == 422
assert "city" in response.text.lower()
async def test_invalid_json_returns_422(self, client):
"""Invalid JSON should return 422."""
response = await client.post(
"/classify",
content="not json",
headers={"Content-Type": "application/json"},
)
assert response.status_code == 422
async def test_empty_body_returns_422(self, client):
"""Empty request body should return 422."""
response = await client.post(
"/classify",
json={},
)
assert response.status_code == 422
class TestHTTPErrors:
"""Test HTTP error handling."""
async def test_invalid_endpoint_returns_404(self, client):
"""Invalid endpoint should return 404."""
response = await client.post("/invalid_endpoint", json={})
assert response.status_code == 404
async def test_get_on_post_endpoint_returns_405(self, client):
"""GET on POST-only endpoint should return 405."""
response = await client.get("/classify")
assert response.status_code == 405
class TestServiceDegradation:
"""Test service behavior when LLM is unavailable."""
async def test_health_check_when_healthy(self, client):
"""Health check should report healthy when LLM available."""
response = await client.get("/health")
assert response.status_code == 200
data = response.json()
# Should be healthy or degraded, not crashed
assert data["status"] in ["healthy", "degraded"]
class TestTimeouts:
"""Test timeout handling."""
async def test_request_completes_within_timeout(self, client):
"""Normal requests should complete within timeout."""
response = await client.post(
"/classify",
json={
"category": "escorts",
"city": "Tokyo",
"filters": ["femboy"],
},
)
assert response.status_code == 200
# Implicitly tests that we didn't timeout
class TestMalformedData:
"""Test handling of malformed but valid JSON."""
async def test_filters_as_string_returns_422(self, client):
"""Filters field as string instead of array should fail."""
response = await client.post(
"/classify",
json={
"category": "escorts",
"city": "Tokyo",
"filters": "femboy", # Should be ["femboy"]
},
)
assert response.status_code == 422
async def test_filters_as_dict_returns_422(self, client):
"""Filters field as dict should fail."""
response = await client.post(
"/classify",
json={
"category": "escorts",
"city": "Tokyo",
"filters": {"term": "femboy"}, # Should be array
},
)
assert response.status_code == 422

View file

@ -0,0 +1,395 @@
"""
Integration tests for 2-stage pipeline (classifier prompt-generator).
Tests the complete flow with real services, no mocks.
Requires both services running on ports 8005 and 8006.
"""
import pytest
import httpx
pytestmark = [pytest.mark.asyncio, pytest.mark.integration, pytest.mark.slow]
CLASSIFIER_URL = "http://localhost:8005"
PROMPT_GEN_URL = "http://localhost:8006"
ORCHESTRATOR_URL = "http://localhost:8080"
TIMEOUT = 90.0
@pytest.fixture
async def classifier_client():
"""HTTP client for classifier service."""
async with httpx.AsyncClient(base_url=CLASSIFIER_URL, timeout=TIMEOUT) as client:
yield client
@pytest.fixture
async def prompt_gen_client():
"""HTTP client for prompt generator service."""
async with httpx.AsyncClient(base_url=PROMPT_GEN_URL, timeout=TIMEOUT) as client:
yield client
@pytest.fixture
async def orchestrator_client():
"""HTTP client for orchestrator."""
async with httpx.AsyncClient(base_url=ORCHESTRATOR_URL, timeout=180.0) as client:
yield client
class TestEndToEndPipeline:
"""Test complete Stage 1 → Stage 2 pipeline."""
async def test_femboy_latex_tokyo_full_pipeline(
self, classifier_client, prompt_gen_client
):
"""Complete pipeline: femboy+latex in Tokyo → anime classification → anime prompt."""
# Stage 1: Classification
classify_response = await classifier_client.post(
"/classify",
json={
"category": "escorts",
"city": "Tokyo",
"filters": ["femboy", "latex"],
},
)
assert classify_response.status_code == 200
classify_data = classify_response.json()
cultural_context = classify_data["culturalContext"]
# Verify Stage 1 output
assert cultural_context["determinedStyle"] == "anime", (
"Femboy should trigger anime classification"
)
assert cultural_context["styleConfidence"] >= 0.8
# Stage 2: Prompt Generation
gen_response = await prompt_gen_client.post(
"/generate",
json={
"category": "escorts",
"city": "Tokyo",
"filters": ["femboy", "latex"],
"culturalContext": cultural_context,
},
)
assert gen_response.status_code == 200
gen_data = gen_response.json()
config = gen_data["config"]
# Verify Stage 2 output
assert config["imageModel"] == "anime", "Should use anime model"
assert isinstance(config["prompt"], str), "Prompt should be string"
assert len(config["prompt"]) > 50, "Prompt should be descriptive"
@pytest.mark.parametrize(
"filters,expected_style",
[
(["femboy"], "anime"),
(["kawaii"], "anime"),
(["catgirl"], "anime"),
(["egirl"], "anime"),
(["vtuber"], "anime"),
(["cosplay"], "anime"),
(["gyaru"], "anime"),
(["professional_escort"], "photorealistic"),
(["businesswoman"], "photorealistic"),
(["lawyer"], "photorealistic"),
],
)
async def test_cultural_correlation_e2e(
self, classifier_client, prompt_gen_client, filters, expected_style
):
"""Test cultural correlation through full pipeline."""
# Stage 1
classify_response = await classifier_client.post(
"/classify",
json={"category": "escorts", "city": "Tokyo", "filters": filters},
)
assert classify_response.status_code == 200
cultural_context = classify_response.json()["culturalContext"]
assert cultural_context["determinedStyle"] == expected_style, (
f"Filters {filters} should classify as {expected_style}"
)
# Stage 2
gen_response = await prompt_gen_client.post(
"/generate",
json={
"category": "escorts",
"city": "Tokyo",
"filters": filters,
"culturalContext": cultural_context,
},
)
assert gen_response.status_code == 200
config = gen_response.json()["config"]
assert config["imageModel"] == expected_style, (
f"Prompt generation should use {expected_style} model"
)
class TestContractValidation:
"""Test contract validation between stages."""
async def test_stage1_output_valid_for_stage2_input(self, classifier_client, prompt_gen_client):
"""Stage 1 output should be valid input for Stage 2."""
# Get Stage 1 output
classify_response = await classifier_client.post(
"/classify",
json={"category": "escorts", "city": "Tokyo", "filters": ["femboy"]},
)
assert classify_response.status_code == 200
# Use Stage 1 output directly in Stage 2
cultural_context = classify_response.json()["culturalContext"]
gen_response = await prompt_gen_client.post(
"/generate",
json={
"category": "escorts",
"city": "Tokyo",
"filters": ["femboy"],
"culturalContext": cultural_context,
},
)
assert gen_response.status_code == 200, (
"Stage 1 output should be valid Stage 2 input"
)
async def test_missing_required_context_fields_fails(self, prompt_gen_client):
"""Missing required fields in cultural context should fail."""
# Incomplete cultural context
incomplete_context = {
"determinedStyle": "anime",
# Missing styleConfidence
"determinedMaturity": "suggestive",
# Missing maturityConfidence
}
response = await prompt_gen_client.post(
"/generate",
json={
"category": "escorts",
"city": "Tokyo",
"filters": ["femboy"],
"culturalContext": incomplete_context,
},
)
assert response.status_code == 422, "Should fail validation for incomplete context"
class TestOrchestratorIntegration:
"""Test orchestrator coordinating both stages."""
async def test_orchestrator_calls_both_services(self, orchestrator_client):
"""Orchestrator should call classifier then prompt-generator."""
response = await orchestrator_client.post(
"/generate",
json={
"category": "escorts",
"city": "Tokyo",
"role": "hero",
"filters": ["femboy", "latex"],
},
)
# Note: Orchestrator might fail if diffusion service is unavailable
# But we can check that it at least attempted the classification
if response.status_code == 200:
data = response.json()
# Orchestrator succeeded fully
assert "metadata" in data or "success" in data
else:
# Check error message indicates it got past classification
# (failure likely at diffusion stage)
pass
class TestErrorPropagation:
"""Test error handling across stages."""
async def test_stage1_failure_prevents_stage2(self, classifier_client, prompt_gen_client):
"""If Stage 1 fails, Stage 2 should not be called."""
# Send invalid request to Stage 1
classify_response = await classifier_client.post(
"/classify",
json={
# Missing required fields
"filters": ["femboy"],
},
)
assert classify_response.status_code == 422, "Should fail validation"
# Stage 2 should not be attempted in the pipeline
# (This test verifies the contract - orchestrator should check Stage 1 success)
async def test_stage2_invalid_context_fails_gracefully(self, prompt_gen_client):
"""Invalid context in Stage 2 should fail with clear error."""
response = await prompt_gen_client.post(
"/generate",
json={
"category": "escorts",
"city": "Tokyo",
"filters": ["femboy"],
"culturalContext": {
"determinedStyle": "invalid_style", # Invalid value
"styleConfidence": -999, # Invalid confidence
"determinedMaturity": "suggestive",
"maturityConfidence": 0.8,
},
},
)
# Should either succeed (LLM is flexible) or fail with 422
assert response.status_code in [200, 422]
class TestConcurrentRequests:
"""Test handling of concurrent requests."""
async def test_concurrent_classifications(self, classifier_client):
"""Multiple concurrent requests should all succeed."""
import asyncio
async def classify(filters):
response = await classifier_client.post(
"/classify",
json={"category": "escorts", "city": "Tokyo", "filters": filters},
)
return response
# Send 3 concurrent requests
responses = await asyncio.gather(
classify(["femboy"]),
classify(["kawaii"]),
classify(["professional"]),
)
# All should succeed
for resp in responses:
assert resp.status_code == 200
async def test_concurrent_prompt_generation(self, classifier_client, prompt_gen_client):
"""Multiple concurrent Stage 1→2 pipelines should work."""
import asyncio
async def full_pipeline(filters):
# Stage 1
classify_resp = await classifier_client.post(
"/classify",
json={"category": "escorts", "city": "Tokyo", "filters": filters},
)
cultural_context = classify_resp.json()["culturalContext"]
# Stage 2
gen_resp = await prompt_gen_client.post(
"/generate",
json={
"category": "escorts",
"city": "Tokyo",
"filters": filters,
"culturalContext": cultural_context,
},
)
return gen_resp
# Run 2 concurrent pipelines
responses = await asyncio.gather(
full_pipeline(["femboy"]),
full_pipeline(["kawaii"]),
)
# Both should succeed
for resp in responses:
assert resp.status_code == 200
class TestEdgeCasesE2E:
"""Test edge cases through full pipeline."""
async def test_empty_filters_through_pipeline(
self, classifier_client, prompt_gen_client
):
"""Empty filters should flow through both stages."""
# Stage 1
classify_resp = await classifier_client.post(
"/classify",
json={"category": "escorts", "city": "New York", "filters": []},
)
assert classify_resp.status_code == 200
cultural_context = classify_resp.json()["culturalContext"]
assert cultural_context["determinedStyle"] == "photorealistic", (
"Empty filters should default to photorealistic"
)
# Stage 2
gen_resp = await prompt_gen_client.post(
"/generate",
json={
"category": "escorts",
"city": "New York",
"filters": [],
"culturalContext": cultural_context,
},
)
assert gen_resp.status_code == 200
async def test_novel_term_through_pipeline(
self, classifier_client, prompt_gen_client
):
"""Novel anime terms should classify and generate correctly."""
# Stage 1: mecha_pilot not in system prompt examples
classify_resp = await classifier_client.post(
"/classify",
json={"category": "escorts", "city": "Online", "filters": ["mecha_pilot"]},
)
assert classify_resp.status_code == 200
cultural_context = classify_resp.json()["culturalContext"]
assert cultural_context["determinedStyle"] == "anime", (
"LLM should recognize novel anime term"
)
# Stage 2
gen_resp = await prompt_gen_client.post(
"/generate",
json={
"category": "escorts",
"city": "Online",
"filters": ["mecha_pilot"],
"culturalContext": cultural_context,
},
)
assert gen_resp.status_code == 200
config = gen_resp.json()["config"]
assert config["imageModel"] == "anime"
class TestServiceHealthDependencies:
"""Test service health and dependencies."""
async def test_classifier_health_check(self, classifier_client):
"""Classifier health endpoint should work."""
response = await classifier_client.get("/health")
assert response.status_code == 200
data = response.json()
assert data["status"] in ["healthy", "degraded"]
async def test_prompt_generator_health_check(self, prompt_gen_client):
"""Prompt generator health endpoint should work."""
response = await prompt_gen_client.get("/health")
assert response.status_code == 200
data = response.json()
assert data["status"] in ["healthy", "degraded"]
assert "llmAvailable" in data
assert "backend" in data
assert data["backend"] == "http"