"""End-to-end pipeline test for a single cut (C01). Tests the full flow: storyboard → image prompt assembly → image gen → TTS → video gen (img2vid) → audio mux → final clip with voice. Cost: ~0.006 pollen (1 image + 1 video clip) """ import asyncio import json import math import sys import time from pathlib import Path sys.path.insert(0, ".") from app.utils.prompt_builder import build_image_prompt, _build_identity_anchor from app.services.pollinations import generate_image, generate_video, upload_media from app.services.tts import generate_tts from app.services.ffmpeg import mux_audio, get_duration OUTPUT_DIR = Path("test_cut01_output") # Characters (same as test_storyboard.py) — name -> visual_prompt CHARACTERS = { "Ye Chen": ( "young adult male, jet black short messy hair with side-swept bangs, " "deep amber eyes, fair skin, athletic build, " "wearing white inner sect disciple robes with silver trim, " "determined intense expression" ), "Gu Changge": ( "young adult male, long flowing silver-white hair, " "cold piercing violet eyes, pale ivory skin, tall elegant build, " "wearing luxurious dark purple and gold noble robes with intricate embroidery, " "calm indifferent expression" ), "Taixuan Holy Lord": ( "middle-aged male, dark brown hair tied in a topknot, " "golden glowing eyes, warm bronze skin, imposing muscular build, " "wearing ornate golden and white holy lord ceremonial robes with dragon motifs, " "stern authoritative expression" ), "Holy Maiden": ( "young adult female, long flowing black hair with jade hairpin, " "autumn water-like gentle brown eyes, porcelain skin, slim graceful build, " "wearing elegant green silk dress with fluttering sleeves, " "ethereal calm expression like a fairy" ), } # Voice configs VOICE_CONFIGS = { "Ye Chen": {"voice_name": "en-US-AndrewNeural", "rate": "+5%", "pitch": "+2Hz"}, "Narrator": {"voice_name": "en-US-GuyNeural", "rate": "-5%", "pitch": "-5Hz"}, "Taixuan Holy Lord": {"voice_name": "en-US-RogerNeural", "rate": "-10%", "pitch": "-10Hz"}, } async def main(): OUTPUT_DIR.mkdir(parents=True, exist_ok=True) # ═══ Step 1: Load storyboard ═══ print("=" * 70) print("CUT 01 END-TO-END PIPELINE TEST") print("=" * 70) storyboard_path = Path("test_storyboard_output.json") if not storyboard_path.exists(): print("ERROR: Run test_storyboard.py first to generate the storyboard") return with open(storyboard_path, "r", encoding="utf-8") as f: storyboard = json.load(f) # Extract C01 cut01 = None for scene in storyboard["scenes"]: for cut in scene.get("cuts", scene.get("shots", [])): if cut.get("cut_id") == "C01": cut01 = cut break if cut01: break if not cut01: print("ERROR: C01 not found in storyboard") return print(f"\nCut: {cut01['cut_id']}") print(f"Shot type: {cut01['shot_type']}") print(f"Camera: {cut01['camera_movement']}") print(f"Duration: {cut01['duration_sec']}s") print(f"Characters: {cut01['characters_present']}") print(f"Focal: {cut01['focal_character']}") print(f"Speaker: {cut01['dialogue']['speaker']}") print(f"Dialogue: {cut01['dialogue']['text'][:80]}...") # ═══ Step 2: Assemble image prompt ═══ print(f"\n{'-' * 70}") print("STEP 2: Image Prompt Assembly") print(f"{'-' * 70}") image_prompt = build_image_prompt(cut01, CHARACTERS) print(f"\nAssembled prompt ({len(image_prompt)} chars):") print(f"\n{image_prompt}") # Show identity anchor for reference for char_name in cut01["characters_present"]: if char_name in CHARACTERS: anchor = _build_identity_anchor(CHARACTERS[char_name]) print(f"\nIdentity anchor for {char_name}: {anchor}") # ═══ Step 3: Generate image ═══ print(f"\n{'-' * 70}") print("STEP 3: Image Generation (imagen-4)") print(f"{'-' * 70}") image_path = str(OUTPUT_DIR / "C01_keyframe.png") # Model priority: try imagen-4, fallback to grok-imagine, then flux IMAGE_MODELS = ["imagen-4", "grok-imagine", "flux"] if Path(image_path).exists(): print(f" CACHED: {image_path}") else: for model in IMAGE_MODELS: t0 = time.time() print(f" Trying {model}...") try: await generate_image( prompt=image_prompt, output_path=image_path, model=model, width=1024, height=768, ) elapsed = time.time() - t0 print(f" OK — {model}, saved to {image_path} ({elapsed:.1f}s)") break except Exception as e: print(f" FAILED ({model}): {e}") if model == IMAGE_MODELS[-1]: print(" All models failed!") return print(f" Trying next model...") await asyncio.sleep(3) # ═══ Step 4: Generate TTS ═══ print(f"\n{'-' * 70}") print("STEP 4: TTS Generation (Edge TTS)") print(f"{'-' * 70}") audio_path = str(OUTPUT_DIR / "C01_voice.mp3") dialogue = cut01["dialogue"] speaker = dialogue["speaker"] voice_config = VOICE_CONFIGS.get(speaker, VOICE_CONFIGS["Narrator"]) if Path(audio_path).exists(): print(f" CACHED: {audio_path}") tts_duration = await get_duration(audio_path) else: t0 = time.time() print(f" Speaker: {speaker}") print(f" Voice: {voice_config['voice_name']}") print(f" Emotion: {dialogue.get('emotion', 'neutral')}") print(f" Text: {dialogue['text'][:100]}...") tts_result = await generate_tts( text=dialogue["text"], output_path=audio_path, voice_name=voice_config["voice_name"], rate=voice_config.get("rate", "+0%"), pitch=voice_config.get("pitch", "+0Hz"), emotion=dialogue.get("emotion", "neutral"), ) tts_duration = tts_result["duration_sec"] elapsed = time.time() - t0 print(f" OK — {tts_duration:.2f}s audio ({elapsed:.1f}s)") print(f" Word timestamps: {len(tts_result.get('word_timestamps', []))} words") print(f"\n TTS duration: {tts_duration:.2f}s") print(f" Storyboard duration: {cut01['duration_sec']}s") # ═══ Step 5: Generate video (img2vid) ═══ print(f"\n{'-' * 70}") print("STEP 5: Video Generation (grok-video img2vid)") print(f"{'-' * 70}") silent_video_path = str(OUTPUT_DIR / "C01_silent.mp4") video_duration = max(1, min(math.ceil(tts_duration), 10)) # Build video prompt video_prompt_parts = [] if cut01.get("video_prompt"): video_prompt_parts.append(cut01["video_prompt"]) if cut01.get("action_description"): video_prompt_parts.append(cut01["action_description"]) video_prompt = ", ".join(video_prompt_parts) if video_prompt_parts else "subtle idle animation" print(f" Video prompt: {video_prompt[:120]}...") print(f" Video duration: {video_duration}s (ceil of {tts_duration:.2f}s TTS)") if Path(silent_video_path).exists(): print(f" CACHED (silent): {silent_video_path}") else: # Upload keyframe image t0 = time.time() print(" Uploading keyframe to media.pollinations.ai...") image_url = await upload_media(image_path) print(f" Image URL: {image_url[:80]}...") print(" Generating video via grok-video (this takes 2-3 minutes)...") await generate_video( prompt=video_prompt, output_path=silent_video_path, model="grok-video", duration=video_duration, image_url=image_url, ) elapsed = time.time() - t0 file_size = Path(silent_video_path).stat().st_size print(f" OK — {file_size // 1024}KB silent video ({elapsed:.1f}s)") # ═══ Step 6: Mux audio into video ═══ print(f"\n{'-' * 70}") print("STEP 6: Audio Mux (FFmpeg)") print(f"{'-' * 70}") final_clip_path = str(OUTPUT_DIR / "C01_final.mp4") t0 = time.time() video_dur = await get_duration(silent_video_path) audio_dur = await get_duration(audio_path) print(f" Silent video: {video_dur:.2f}s") print(f" TTS audio: {audio_dur:.2f}s") print(f" Mismatch: {abs(video_dur - audio_dur):.2f}s") if audio_dur > video_dur + 0.5: print(f" Strategy: freeze last frame + pad {audio_dur - video_dur:.1f}s") elif video_dur > audio_dur: print(f" Strategy: trim video to match audio (-shortest)") else: print(f" Strategy: durations match, simple remux") await mux_audio( video_path=silent_video_path, audio_path=audio_path, output_path=final_clip_path, duration_sec=tts_duration, ) elapsed = time.time() - t0 final_dur = await get_duration(final_clip_path) final_size = Path(final_clip_path).stat().st_size print(f" OK — final clip: {final_dur:.2f}s, {final_size // 1024}KB ({elapsed:.1f}s)") # ═══ Summary ═══ print(f"\n{'=' * 70}") print("RESULTS") print(f"{'=' * 70}") print(f" Keyframe image: {image_path}") print(f" TTS audio: {audio_path} ({audio_dur:.2f}s)") print(f" Silent video: {silent_video_path} ({video_dur:.2f}s)") print(f" Final clip: {final_clip_path} ({final_dur:.2f}s)") print(f"\n Pollen cost: ~0.006 (0.0025 image + 0.003/s × {video_duration}s video)") print(f"\n Open {OUTPUT_DIR.resolve()} to review the outputs!") print(f" Play {final_clip_path} to see the final result with voice!") if __name__ == "__main__": asyncio.run(main())