"""Full episode pipeline test — all 10 cuts from Chapter 1 storyboard. Generates portraits, images, TTS, video, muxes audio, and assembles final episode. Everything caches — safe to re-run if interrupted. Estimated cost: ~0.45 pollen (portraits + images + video) Estimated wall time: 30-45 minutes (mostly video generation) """ import asyncio import json import math import os import sys import time from pathlib import Path sys.path.insert(0, ".") from app.utils.prompt_builder import build_image_prompt, _build_identity_anchor from app.services.pollinations import ( generate_image as poll_generate_image, generate_video, upload_media, VISION_MODELS, ) from app.services.tts import generate_tts from app.services.ffmpeg import mux_audio, get_duration, concat_clips OUTPUT_DIR = Path("test_episode_output") PORTRAIT_DIR = OUTPUT_DIR / "portraits" IMAGE_DIR = OUTPUT_DIR / "images" AUDIO_DIR = OUTPUT_DIR / "audio" VIDEO_DIR = OUTPUT_DIR / "video" CLIP_DIR = OUTPUT_DIR / "clips" # ============================================================ # Character data # ============================================================ CHARACTERS = { "Ye Chen": { "visual_prompt": ( "young adult male, jet black short messy hair with side-swept bangs, " "deep amber eyes, fair skin, athletic build, " "wearing white inner sect disciple robes with silver trim, " "determined intense expression" ), "role": "protagonist", }, "Gu Changge": { "visual_prompt": ( "young adult male, long flowing silver-white hair, " "cold piercing violet eyes, pale ivory skin, tall elegant build, " "wearing luxurious dark purple and gold noble robes with intricate embroidery, " "calm indifferent expression" ), "role": "antagonist", }, "Taixuan Holy Lord": { "visual_prompt": ( "middle-aged male, dark brown hair tied in a topknot, " "golden glowing eyes, warm bronze skin, imposing muscular build, " "wearing ornate golden and white holy lord ceremonial robes with dragon motifs, " "stern authoritative expression" ), "role": "supporting", }, "Holy Maiden": { "visual_prompt": ( "young adult female, long flowing black hair with jade hairpin, " "autumn water-like gentle brown eyes, porcelain skin, slim graceful build, " "wearing elegant green silk dress with fluttering sleeves, " "ethereal calm expression like a fairy" ), "role": "supporting", }, } # Character name -> visual_prompt (flat dict for build_image_prompt) CHAR_VISUALS = {name: data["visual_prompt"] for name, data in CHARACTERS.items()} # ============================================================ # Voice configs (Edge TTS) # ============================================================ VOICE_CONFIGS = { "Ye Chen": {"voice_name": "en-US-AndrewNeural", "rate": "+5%", "pitch": "+2Hz"}, "Narrator": {"voice_name": "en-US-GuyNeural", "rate": "-5%", "pitch": "-3Hz"}, "Taixuan Holy Lord": {"voice_name": "en-US-RogerNeural", "rate": "-10%", "pitch": "-8Hz"}, } # ============================================================ # Model config # ============================================================ PORTRAIT_MODEL = "klein-large" PORTRAIT_SEED = 42 IMAGE_SEED = 42 CHARACTER_IMAGE_MODEL = "klein-large" # For cuts with focal character + portrait ref GENERIC_IMAGE_MODEL = "grok-imagine" # For cuts without characters (wide/establishing) VIDEO_MODEL = "grok-video" API_DELAY = 7 # seconds between Pollinations API calls (rate limit safety) # ============================================================ # Portrait prompt template # ============================================================ from app.utils.prompt_builder import MANHWA_STYLE_PREFIX PORTRAIT_PROMPT = ( "{style_prefix}, character portrait sheet, front-facing bust shot, " "{visual_prompt}, clean white background, reference sheet style, " "sharp details, no background elements, studio lighting, " "high detail face and eyes, character design reference" ) # Track costs total_cost = 0.0 def log_cost(label: str, amount: float): global total_cost total_cost += amount print(f" [COST] +{amount:.4f} pollen ({label}) | Running total: {total_cost:.4f}") # ============================================================ # Stage 1: Portrait Generation # ============================================================ async def generate_portraits(storyboard: dict) -> dict[str, str]: """Generate reference portraits for characters that appear as focal. Returns {character_name: portrait_url}.""" print("\n" + "=" * 70) print("STAGE 1: CHARACTER PORTRAITS") print("=" * 70) # Find which characters appear as focal in any cut focal_chars = set() for scene in storyboard.get("scenes", []): for cut in scene.get("cuts", scene.get("shots", [])): focal = cut.get("focal_character") if focal and focal in CHARACTERS: focal_chars.add(focal) print(f" Focal characters needing portraits: {sorted(focal_chars)}") portrait_urls = {} PORTRAIT_DIR.mkdir(parents=True, exist_ok=True) for i, name in enumerate(sorted(focal_chars)): portrait_path = str(PORTRAIT_DIR / f"{name.replace(' ', '_')}_portrait.png") url_cache_path = portrait_path + ".url" # Check cache if Path(url_cache_path).exists(): url = Path(url_cache_path).read_text().strip() portrait_urls[name] = url print(f"\n [{i+1}/{len(focal_chars)}] {name}: CACHED ({url[:60]}...)") continue print(f"\n [{i+1}/{len(focal_chars)}] {name}:") char_data = CHARACTERS[name] # Generate portrait image if Path(portrait_path).exists(): print(f" Image CACHED: {portrait_path}") else: prompt = PORTRAIT_PROMPT.format( style_prefix=MANHWA_STYLE_PREFIX, visual_prompt=char_data["visual_prompt"], ) print(f" Generating portrait ({PORTRAIT_MODEL}, seed={PORTRAIT_SEED})...") t0 = time.time() await poll_generate_image( prompt=prompt, output_path=portrait_path, model=PORTRAIT_MODEL, width=768, height=1024, seed=PORTRAIT_SEED, ) elapsed = time.time() - t0 print(f" OK - portrait saved ({elapsed:.1f}s)") log_cost(f"portrait {name}", 0.012) await asyncio.sleep(API_DELAY) # Upload to get permanent URL print(f" Uploading to media.pollinations.ai...") t0 = time.time() url = await upload_media(portrait_path) elapsed = time.time() - t0 print(f" URL: {url[:60]}... ({elapsed:.1f}s)") # Cache the URL Path(url_cache_path).write_text(url) portrait_urls[name] = url print(f"\n Portraits done: {len(portrait_urls)} characters") return portrait_urls # ============================================================ # Stage 2: Image Generation # ============================================================ async def generate_images( all_cuts: list[dict], portrait_urls: dict[str, str], ) -> None: """Generate keyframe images for all cuts.""" print("\n" + "=" * 70) print("STAGE 2: IMAGE GENERATION") print("=" * 70) IMAGE_DIR.mkdir(parents=True, exist_ok=True) total = len(all_cuts) for i, cut in enumerate(all_cuts): cut_id = cut["cut_id"] image_path = str(IMAGE_DIR / f"{cut_id}.png") cut["_image_path"] = image_path if Path(image_path).exists(): print(f"\n [{i+1}/{total}] {cut_id}: CACHED") continue # Determine model based on focal character focal = cut.get("focal_character") shot_type = cut.get("shot_type", "medium") ref_url = None if focal and focal in portrait_urls and shot_type not in ("establishing", "wide", "birds_eye"): model = CHARACTER_IMAGE_MODEL ref_url = portrait_urls[focal] model_label = f"{model} + portrait ref" else: model = GENERIC_IMAGE_MODEL model_label = model # Build prompt prompt = build_image_prompt(cut, CHAR_VISUALS) print(f"\n [{i+1}/{total}] {cut_id} ({shot_type}, {model_label}):") print(f" Prompt ({len(prompt)} chars): {prompt[:100]}...") t0 = time.time() try: await poll_generate_image( prompt=prompt, output_path=image_path, model=model, width=1024, height=768, seed=IMAGE_SEED, reference_image_url=ref_url, ) elapsed = time.time() - t0 size_kb = Path(image_path).stat().st_size // 1024 print(f" OK - {size_kb}KB ({elapsed:.1f}s)") cost = 0.012 if model == CHARACTER_IMAGE_MODEL else 0.0025 log_cost(f"image {cut_id}", cost) except Exception as e: print(f" FAILED: {e}") # Try fallback model fallback = "grok-imagine" if model != "grok-imagine" else "flux" print(f" Retrying with {fallback}...") try: await asyncio.sleep(3) await poll_generate_image( prompt=prompt, output_path=image_path, model=fallback, width=1024, height=768, seed=IMAGE_SEED, ) elapsed = time.time() - t0 print(f" OK (fallback) - ({elapsed:.1f}s)") cost = 0.0025 if fallback == "grok-imagine" else 0.0 log_cost(f"image {cut_id} fallback", cost) except Exception as e2: print(f" FALLBACK ALSO FAILED: {e2}") cut["_image_path"] = None await asyncio.sleep(API_DELAY) # ============================================================ # Stage 3: TTS Generation # ============================================================ async def generate_all_tts(all_cuts: list[dict]) -> None: """Generate TTS audio for all cuts.""" print("\n" + "=" * 70) print("STAGE 3: TTS GENERATION (Edge TTS - FREE)") print("=" * 70) AUDIO_DIR.mkdir(parents=True, exist_ok=True) total = len(all_cuts) for i, cut in enumerate(all_cuts): cut_id = cut["cut_id"] audio_path = str(AUDIO_DIR / f"{cut_id}.mp3") cut["_audio_path"] = audio_path dialogue = cut.get("dialogue", {}) text = dialogue.get("text") or "" if not text.strip(): print(f" [{i+1}/{total}] {cut_id}: No dialogue, skipping TTS") cut["_audio_path"] = None cut["_tts_duration"] = 0.0 continue if Path(audio_path).exists(): dur = await get_duration(audio_path) cut["_tts_duration"] = dur print(f" [{i+1}/{total}] {cut_id}: CACHED ({dur:.2f}s)") continue speaker = dialogue.get("speaker", "Narrator") voice_config = VOICE_CONFIGS.get(speaker, VOICE_CONFIGS["Narrator"]) emotion = dialogue.get("emotion", "neutral") print(f" [{i+1}/{total}] {cut_id}: {speaker} ({emotion}) - \"{text[:60]}...\"") t0 = time.time() try: result = await generate_tts( text=text, output_path=audio_path, voice_name=voice_config["voice_name"], rate=voice_config.get("rate", "+0%"), pitch=voice_config.get("pitch", "+0Hz"), emotion=emotion, ) dur = result["duration_sec"] # Fallback: measure with ffprobe if timestamps failed if dur < 0.1: dur = await get_duration(audio_path) cut["_tts_duration"] = dur elapsed = time.time() - t0 print(f" OK - {dur:.2f}s ({elapsed:.1f}s)") except Exception as e: print(f" FAILED: {e}") cut["_audio_path"] = None cut["_tts_duration"] = 0.0 # ============================================================ # Stage 4: Video Generation (img2vid) # ============================================================ async def generate_all_videos(all_cuts: list[dict]) -> None: """Generate video clips for all cuts via grok-video img2vid.""" print("\n" + "=" * 70) print("STAGE 4: VIDEO GENERATION (grok-video img2vid)") print("This will take 20-30 minutes. Each clip needs 2-3 min to generate.") print("=" * 70) VIDEO_DIR.mkdir(parents=True, exist_ok=True) total = len(all_cuts) # We need to upload each image first, then generate video for i, cut in enumerate(all_cuts): cut_id = cut["cut_id"] image_path = cut.get("_image_path") silent_path = str(VIDEO_DIR / f"{cut_id}_silent.mp4") cut["_silent_video_path"] = silent_path if not image_path or not Path(image_path).exists(): print(f"\n [{i+1}/{total}] {cut_id}: SKIPPED (no keyframe image)") cut["_silent_video_path"] = None continue if Path(silent_path).exists(): dur = await get_duration(silent_path) print(f"\n [{i+1}/{total}] {cut_id}: CACHED ({dur:.2f}s)") continue # Calculate video duration from TTS tts_dur = cut.get("_tts_duration", 0.0) storyboard_dur = cut.get("duration_sec", 3.0) # Use TTS duration if available, otherwise storyboard estimate target_dur = tts_dur if tts_dur > 0.5 else storyboard_dur # Clamp to grok-video limits (1-10s) video_dur = max(1, min(math.ceil(target_dur), 10)) # Build video prompt video_prompt_parts = [] if cut.get("video_prompt"): video_prompt_parts.append(cut["video_prompt"]) if cut.get("action_description"): video_prompt_parts.append(cut["action_description"]) video_prompt = ", ".join(video_prompt_parts) if video_prompt_parts else "subtle idle animation" print(f"\n [{i+1}/{total}] {cut_id} (requesting {video_dur}s, TTS={tts_dur:.1f}s):") print(f" Video prompt: {video_prompt[:100]}...") # Upload keyframe t0 = time.time() print(f" Uploading keyframe...") try: image_url = await upload_media(image_path) except Exception as e: print(f" Upload FAILED: {e}") cut["_silent_video_path"] = None continue # Generate video print(f" Generating video (this takes 2-3 minutes)...") try: await generate_video( prompt=video_prompt, output_path=silent_path, model=VIDEO_MODEL, duration=video_dur, image_url=image_url, ) elapsed = time.time() - t0 file_size = Path(silent_path).stat().st_size actual_dur = await get_duration(silent_path) print(f" OK - {file_size // 1024}KB, {actual_dur:.2f}s actual ({elapsed:.1f}s)") log_cost(f"video {cut_id} ({video_dur}s)", 0.003 * video_dur) except Exception as e: print(f" FAILED: {e}") cut["_silent_video_path"] = None await asyncio.sleep(API_DELAY) # ============================================================ # Stage 5: Audio Mux # ============================================================ async def mux_all_audio(all_cuts: list[dict]) -> None: """Mux TTS audio into each video clip.""" print("\n" + "=" * 70) print("STAGE 5: AUDIO MUX (FFmpeg)") print("=" * 70) CLIP_DIR.mkdir(parents=True, exist_ok=True) total = len(all_cuts) for i, cut in enumerate(all_cuts): cut_id = cut["cut_id"] silent_path = cut.get("_silent_video_path") audio_path = cut.get("_audio_path") clip_path = str(CLIP_DIR / f"{cut_id}.mp4") cut["_clip_path"] = clip_path if Path(clip_path).exists(): dur = await get_duration(clip_path) print(f" [{i+1}/{total}] {cut_id}: CACHED ({dur:.2f}s)") continue if not silent_path or not Path(silent_path).exists(): print(f" [{i+1}/{total}] {cut_id}: SKIPPED (no video)") cut["_clip_path"] = None continue video_dur = await get_duration(silent_path) audio_dur = await get_duration(audio_path) if audio_path and Path(audio_path).exists() else 0.0 strategy = "simple remux" if audio_dur > video_dur + 0.5: slowdown = min(audio_dur / video_dur, 3.0) strategy = f"slow-mo {slowdown:.1f}x" if audio_dur > video_dur * 3.0 + 0.3: strategy += f" + freeze {audio_dur - video_dur * 3.0:.1f}s" elif video_dur > audio_dur + 0.5: strategy = "trim to audio (-shortest)" print(f" [{i+1}/{total}] {cut_id}: video={video_dur:.2f}s, audio={audio_dur:.2f}s -> {strategy}") try: tts_dur = cut.get("_tts_duration", 0.0) await mux_audio( video_path=silent_path, audio_path=audio_path, output_path=clip_path, duration_sec=tts_dur if tts_dur > 0 else None, ) final_dur = await get_duration(clip_path) print(f" OK - {final_dur:.2f}s final") except Exception as e: print(f" FAILED: {e}") cut["_clip_path"] = None # ============================================================ # Stage 6: Assembly # ============================================================ async def assemble_episode(all_cuts: list[dict], storyboard: dict) -> str: """Concatenate all clips into the final episode.""" print("\n" + "=" * 70) print("STAGE 6: FINAL ASSEMBLY") print("=" * 70) episode_path = str(OUTPUT_DIR / "episode_final.mp4") # Collect valid clips clip_paths = [] transitions = [] for cut in all_cuts: clip_path = cut.get("_clip_path") if clip_path and Path(clip_path).exists(): clip_paths.append(clip_path) # Use cut's transition_out for next boundary trans_out = cut.get("transition_out", "cut") transitions.append(trans_out) if not clip_paths: print(" ERROR: No clips to assemble!") return "" print(f" Clips: {len(clip_paths)}/{len(all_cuts)}") for cp in clip_paths: dur = await get_duration(cp) print(f" {Path(cp).stem}: {dur:.2f}s") # Transitions: first clip has no transition before it # transitions[i] = transition AFTER clip i (before clip i+1) # For concat_clips, transitions[i] = transition BEFORE clip i # Shift: transitions_for_concat[0] = scene's transition_in, rest follow transition_out of previous scene_trans_in = storyboard["scenes"][0].get("transition_in", "fade_black") trans_for_concat = [scene_trans_in] # Before first clip for j in range(len(clip_paths) - 1): trans_for_concat.append(transitions[j]) # transition_out of clip j = transition before clip j+1 print(f"\n Transitions: {trans_for_concat}") print(f" Assembling...") t0 = time.time() try: await concat_clips(clip_paths, episode_path, trans_for_concat) elapsed = time.time() - t0 final_dur = await get_duration(episode_path) final_size = Path(episode_path).stat().st_size print(f" OK - {final_dur:.2f}s, {final_size // (1024*1024):.1f}MB ({elapsed:.1f}s)") except Exception as e: print(f" xfade failed: {e}") print(f" Falling back to simple concat...") try: from app.services.ffmpeg import _concat_simple await _concat_simple(clip_paths, episode_path) final_dur = await get_duration(episode_path) final_size = Path(episode_path).stat().st_size print(f" OK (simple) - {final_dur:.2f}s, {final_size // (1024*1024):.1f}MB") except Exception as e2: print(f" Simple concat also failed: {e2}") return "" return episode_path # ============================================================ # Main # ============================================================ async def main(): global total_cost overall_start = time.time() # Create output dirs for d in [OUTPUT_DIR, PORTRAIT_DIR, IMAGE_DIR, AUDIO_DIR, VIDEO_DIR, CLIP_DIR]: d.mkdir(parents=True, exist_ok=True) print("=" * 70) print("FULL EPISODE PIPELINE TEST") print(f"Output: {OUTPUT_DIR.resolve()}") print("=" * 70) # Load storyboard storyboard_path = Path("test_storyboard_output.json") if not storyboard_path.exists(): print("ERROR: Run test_storyboard.py first to generate the storyboard") return with open(storyboard_path, "r", encoding="utf-8") as f: storyboard = json.load(f) print(f"\nEpisode: {storyboard.get('episode_title', 'Unknown')}") print(f"Arc: {storyboard.get('emotional_arc', '')[:80]}...") # Extract all cuts all_cuts = [] for scene in storyboard.get("scenes", []): for cut in scene.get("cuts", scene.get("shots", [])): all_cuts.append(cut) print(f"Total cuts: {len(all_cuts)}") total_storyboard_dur = sum(c.get("duration_sec", 3.0) for c in all_cuts) print(f"Storyboard duration: {total_storyboard_dur:.1f}s") # Summary table print(f"\n {'Cut':<6} {'Type':<18} {'Focal':<20} {'Model':<15} {'Dur':>5}") print(f" {'-'*68}") for cut in all_cuts: cut_id = cut["cut_id"] shot_type = cut.get("shot_type", "?") focal = cut.get("focal_character") or "-" model = CHARACTER_IMAGE_MODEL if focal != "-" and shot_type not in ("wide", "establishing", "birds_eye") else GENERIC_IMAGE_MODEL dur = cut.get("duration_sec", 0) print(f" {cut_id:<6} {shot_type:<18} {focal:<20} {model:<15} {dur:>4.1f}s") # ---- Run pipeline stages ---- # Stage 1: Portraits portrait_urls = await generate_portraits(storyboard) # Stage 2: Images await generate_images(all_cuts, portrait_urls) # Stage 3: TTS (fast, free) await generate_all_tts(all_cuts) # Print TTS duration summary print(f"\n TTS Duration Summary:") total_tts_dur = 0.0 for cut in all_cuts: tts_dur = cut.get("_tts_duration", 0.0) sb_dur = cut.get("duration_sec", 0.0) total_tts_dur += tts_dur ratio = tts_dur / sb_dur if sb_dur > 0 else 0 print(f" {cut['cut_id']}: TTS={tts_dur:.2f}s vs storyboard={sb_dur:.1f}s (ratio={ratio:.1f}x)") print(f" Total TTS: {total_tts_dur:.2f}s vs storyboard {total_storyboard_dur:.1f}s") # Stage 4: Video (slow — 20-30 min) await generate_all_videos(all_cuts) # Stage 5: Audio mux await mux_all_audio(all_cuts) # Stage 6: Assembly episode_path = await assemble_episode(all_cuts, storyboard) # ---- Final Summary ---- overall_elapsed = time.time() - overall_start print(f"\n{'=' * 70}") print("FINAL SUMMARY") print(f"{'=' * 70}") print(f"\n Episode: {storyboard.get('episode_title', 'Unknown')}") print(f" Cuts: {len(all_cuts)}") # Clip durations successful_clips = 0 total_clip_dur = 0.0 for cut in all_cuts: clip_path = cut.get("_clip_path") if clip_path and Path(clip_path).exists(): dur = await get_duration(clip_path) total_clip_dur += dur successful_clips += 1 print(f" Successful clips: {successful_clips}/{len(all_cuts)}") print(f" Total clip duration: {total_clip_dur:.2f}s ({total_clip_dur/60:.1f} min)") if episode_path and Path(episode_path).exists(): ep_dur = await get_duration(episode_path) ep_size = Path(episode_path).stat().st_size print(f"\n Final episode: {episode_path}") print(f" Duration: {ep_dur:.2f}s ({ep_dur/60:.1f} min)") print(f" File size: {ep_size / (1024*1024):.1f}MB") print(f"\n Total pollen spent: {total_cost:.4f}") print(f" Wall time: {overall_elapsed:.0f}s ({overall_elapsed/60:.1f} min)") print(f"\n Output directory: {OUTPUT_DIR.resolve()}") # Failed cuts failed = [cut["cut_id"] for cut in all_cuts if not cut.get("_clip_path") or not Path(cut.get("_clip_path", "")).exists()] if failed: print(f"\n FAILED CUTS: {failed}") print(f" Re-run this script to retry failed cuts (cached steps will be skipped)") if episode_path: print(f"\n Play the final episode:") print(f" {Path(episode_path).resolve()}") if __name__ == "__main__": asyncio.run(main())