Spaces:
Sleeping
Sleeping
| """End-to-end pipeline test for a single cut (C01). | |
| Tests the full flow: storyboard β image prompt assembly β image gen β | |
| TTS β video gen (img2vid) β audio mux β final clip with voice. | |
| Cost: ~0.006 pollen (1 image + 1 video clip) | |
| """ | |
| import asyncio | |
| import json | |
| import math | |
| import sys | |
| import time | |
| from pathlib import Path | |
| sys.path.insert(0, ".") | |
| from app.utils.prompt_builder import build_image_prompt, _build_identity_anchor | |
| from app.services.pollinations import generate_image, generate_video, upload_media | |
| from app.services.tts import generate_tts | |
| from app.services.ffmpeg import mux_audio, get_duration | |
| OUTPUT_DIR = Path("test_cut01_output") | |
| # Characters (same as test_storyboard.py) β name -> visual_prompt | |
| CHARACTERS = { | |
| "Ye Chen": ( | |
| "young adult male, jet black short messy hair with side-swept bangs, " | |
| "deep amber eyes, fair skin, athletic build, " | |
| "wearing white inner sect disciple robes with silver trim, " | |
| "determined intense expression" | |
| ), | |
| "Gu Changge": ( | |
| "young adult male, long flowing silver-white hair, " | |
| "cold piercing violet eyes, pale ivory skin, tall elegant build, " | |
| "wearing luxurious dark purple and gold noble robes with intricate embroidery, " | |
| "calm indifferent expression" | |
| ), | |
| "Taixuan Holy Lord": ( | |
| "middle-aged male, dark brown hair tied in a topknot, " | |
| "golden glowing eyes, warm bronze skin, imposing muscular build, " | |
| "wearing ornate golden and white holy lord ceremonial robes with dragon motifs, " | |
| "stern authoritative expression" | |
| ), | |
| "Holy Maiden": ( | |
| "young adult female, long flowing black hair with jade hairpin, " | |
| "autumn water-like gentle brown eyes, porcelain skin, slim graceful build, " | |
| "wearing elegant green silk dress with fluttering sleeves, " | |
| "ethereal calm expression like a fairy" | |
| ), | |
| } | |
| # Voice configs | |
| VOICE_CONFIGS = { | |
| "Ye Chen": {"voice_name": "en-US-AndrewNeural", "rate": "+5%", "pitch": "+2Hz"}, | |
| "Narrator": {"voice_name": "en-US-GuyNeural", "rate": "-5%", "pitch": "-5Hz"}, | |
| "Taixuan Holy Lord": {"voice_name": "en-US-RogerNeural", "rate": "-10%", "pitch": "-10Hz"}, | |
| } | |
| async def main(): | |
| OUTPUT_DIR.mkdir(parents=True, exist_ok=True) | |
| # βββ Step 1: Load storyboard βββ | |
| print("=" * 70) | |
| print("CUT 01 END-TO-END PIPELINE TEST") | |
| print("=" * 70) | |
| storyboard_path = Path("test_storyboard_output.json") | |
| if not storyboard_path.exists(): | |
| print("ERROR: Run test_storyboard.py first to generate the storyboard") | |
| return | |
| with open(storyboard_path, "r", encoding="utf-8") as f: | |
| storyboard = json.load(f) | |
| # Extract C01 | |
| cut01 = None | |
| for scene in storyboard["scenes"]: | |
| for cut in scene.get("cuts", scene.get("shots", [])): | |
| if cut.get("cut_id") == "C01": | |
| cut01 = cut | |
| break | |
| if cut01: | |
| break | |
| if not cut01: | |
| print("ERROR: C01 not found in storyboard") | |
| return | |
| print(f"\nCut: {cut01['cut_id']}") | |
| print(f"Shot type: {cut01['shot_type']}") | |
| print(f"Camera: {cut01['camera_movement']}") | |
| print(f"Duration: {cut01['duration_sec']}s") | |
| print(f"Characters: {cut01['characters_present']}") | |
| print(f"Focal: {cut01['focal_character']}") | |
| print(f"Speaker: {cut01['dialogue']['speaker']}") | |
| print(f"Dialogue: {cut01['dialogue']['text'][:80]}...") | |
| # βββ Step 2: Assemble image prompt βββ | |
| print(f"\n{'-' * 70}") | |
| print("STEP 2: Image Prompt Assembly") | |
| print(f"{'-' * 70}") | |
| image_prompt = build_image_prompt(cut01, CHARACTERS) | |
| print(f"\nAssembled prompt ({len(image_prompt)} chars):") | |
| print(f"\n{image_prompt}") | |
| # Show identity anchor for reference | |
| for char_name in cut01["characters_present"]: | |
| if char_name in CHARACTERS: | |
| anchor = _build_identity_anchor(CHARACTERS[char_name]) | |
| print(f"\nIdentity anchor for {char_name}: {anchor}") | |
| # βββ Step 3: Generate image βββ | |
| print(f"\n{'-' * 70}") | |
| print("STEP 3: Image Generation (imagen-4)") | |
| print(f"{'-' * 70}") | |
| image_path = str(OUTPUT_DIR / "C01_keyframe.png") | |
| # Model priority: try imagen-4, fallback to grok-imagine, then flux | |
| IMAGE_MODELS = ["imagen-4", "grok-imagine", "flux"] | |
| if Path(image_path).exists(): | |
| print(f" CACHED: {image_path}") | |
| else: | |
| for model in IMAGE_MODELS: | |
| t0 = time.time() | |
| print(f" Trying {model}...") | |
| try: | |
| await generate_image( | |
| prompt=image_prompt, | |
| output_path=image_path, | |
| model=model, | |
| width=1024, | |
| height=768, | |
| ) | |
| elapsed = time.time() - t0 | |
| print(f" OK β {model}, saved to {image_path} ({elapsed:.1f}s)") | |
| break | |
| except Exception as e: | |
| print(f" FAILED ({model}): {e}") | |
| if model == IMAGE_MODELS[-1]: | |
| print(" All models failed!") | |
| return | |
| print(f" Trying next model...") | |
| await asyncio.sleep(3) | |
| # βββ Step 4: Generate TTS βββ | |
| print(f"\n{'-' * 70}") | |
| print("STEP 4: TTS Generation (Edge TTS)") | |
| print(f"{'-' * 70}") | |
| audio_path = str(OUTPUT_DIR / "C01_voice.mp3") | |
| dialogue = cut01["dialogue"] | |
| speaker = dialogue["speaker"] | |
| voice_config = VOICE_CONFIGS.get(speaker, VOICE_CONFIGS["Narrator"]) | |
| if Path(audio_path).exists(): | |
| print(f" CACHED: {audio_path}") | |
| tts_duration = await get_duration(audio_path) | |
| else: | |
| t0 = time.time() | |
| print(f" Speaker: {speaker}") | |
| print(f" Voice: {voice_config['voice_name']}") | |
| print(f" Emotion: {dialogue.get('emotion', 'neutral')}") | |
| print(f" Text: {dialogue['text'][:100]}...") | |
| tts_result = await generate_tts( | |
| text=dialogue["text"], | |
| output_path=audio_path, | |
| voice_name=voice_config["voice_name"], | |
| rate=voice_config.get("rate", "+0%"), | |
| pitch=voice_config.get("pitch", "+0Hz"), | |
| emotion=dialogue.get("emotion", "neutral"), | |
| ) | |
| tts_duration = tts_result["duration_sec"] | |
| elapsed = time.time() - t0 | |
| print(f" OK β {tts_duration:.2f}s audio ({elapsed:.1f}s)") | |
| print(f" Word timestamps: {len(tts_result.get('word_timestamps', []))} words") | |
| print(f"\n TTS duration: {tts_duration:.2f}s") | |
| print(f" Storyboard duration: {cut01['duration_sec']}s") | |
| # βββ Step 5: Generate video (img2vid) βββ | |
| print(f"\n{'-' * 70}") | |
| print("STEP 5: Video Generation (grok-video img2vid)") | |
| print(f"{'-' * 70}") | |
| silent_video_path = str(OUTPUT_DIR / "C01_silent.mp4") | |
| video_duration = max(1, min(math.ceil(tts_duration), 10)) | |
| # Build video prompt | |
| video_prompt_parts = [] | |
| if cut01.get("video_prompt"): | |
| video_prompt_parts.append(cut01["video_prompt"]) | |
| if cut01.get("action_description"): | |
| video_prompt_parts.append(cut01["action_description"]) | |
| video_prompt = ", ".join(video_prompt_parts) if video_prompt_parts else "subtle idle animation" | |
| print(f" Video prompt: {video_prompt[:120]}...") | |
| print(f" Video duration: {video_duration}s (ceil of {tts_duration:.2f}s TTS)") | |
| if Path(silent_video_path).exists(): | |
| print(f" CACHED (silent): {silent_video_path}") | |
| else: | |
| # Upload keyframe image | |
| t0 = time.time() | |
| print(" Uploading keyframe to media.pollinations.ai...") | |
| image_url = await upload_media(image_path) | |
| print(f" Image URL: {image_url[:80]}...") | |
| print(" Generating video via grok-video (this takes 2-3 minutes)...") | |
| await generate_video( | |
| prompt=video_prompt, | |
| output_path=silent_video_path, | |
| model="grok-video", | |
| duration=video_duration, | |
| image_url=image_url, | |
| ) | |
| elapsed = time.time() - t0 | |
| file_size = Path(silent_video_path).stat().st_size | |
| print(f" OK β {file_size // 1024}KB silent video ({elapsed:.1f}s)") | |
| # βββ Step 6: Mux audio into video βββ | |
| print(f"\n{'-' * 70}") | |
| print("STEP 6: Audio Mux (FFmpeg)") | |
| print(f"{'-' * 70}") | |
| final_clip_path = str(OUTPUT_DIR / "C01_final.mp4") | |
| t0 = time.time() | |
| video_dur = await get_duration(silent_video_path) | |
| audio_dur = await get_duration(audio_path) | |
| print(f" Silent video: {video_dur:.2f}s") | |
| print(f" TTS audio: {audio_dur:.2f}s") | |
| print(f" Mismatch: {abs(video_dur - audio_dur):.2f}s") | |
| if audio_dur > video_dur + 0.5: | |
| print(f" Strategy: freeze last frame + pad {audio_dur - video_dur:.1f}s") | |
| elif video_dur > audio_dur: | |
| print(f" Strategy: trim video to match audio (-shortest)") | |
| else: | |
| print(f" Strategy: durations match, simple remux") | |
| await mux_audio( | |
| video_path=silent_video_path, | |
| audio_path=audio_path, | |
| output_path=final_clip_path, | |
| duration_sec=tts_duration, | |
| ) | |
| elapsed = time.time() - t0 | |
| final_dur = await get_duration(final_clip_path) | |
| final_size = Path(final_clip_path).stat().st_size | |
| print(f" OK β final clip: {final_dur:.2f}s, {final_size // 1024}KB ({elapsed:.1f}s)") | |
| # βββ Summary βββ | |
| print(f"\n{'=' * 70}") | |
| print("RESULTS") | |
| print(f"{'=' * 70}") | |
| print(f" Keyframe image: {image_path}") | |
| print(f" TTS audio: {audio_path} ({audio_dur:.2f}s)") | |
| print(f" Silent video: {silent_video_path} ({video_dur:.2f}s)") | |
| print(f" Final clip: {final_clip_path} ({final_dur:.2f}s)") | |
| print(f"\n Pollen cost: ~0.006 (0.0025 image + 0.003/s Γ {video_duration}s video)") | |
| print(f"\n Open {OUTPUT_DIR.resolve()} to review the outputs!") | |
| print(f" Play {final_clip_path} to see the final result with voice!") | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |