Spaces:
Sleeping
Sleeping
| """Phase 2: Pollinations Video Model Test (img2vid). | |
| Takes an image (from Phase 1 or provided), uploads it to media.pollinations.ai, | |
| then generates a 5-second video via grok-video img2vid. | |
| Cost: ~0.015 pollen (5 seconds × $0.003/sec) | |
| Usage: | |
| python test_pollinations_video.py [path_to_image] | |
| If no image path is provided, uses the best Phase 1 image (imagen-4.png). | |
| """ | |
| import asyncio | |
| import sys | |
| import time | |
| from pathlib import Path | |
| import httpx | |
| API_KEY = "pk_Qh43XHjquSeBBiqm" | |
| BASE_URL = "https://gen.pollinations.ai" | |
| MEDIA_URL = "https://media.pollinations.ai" | |
| OUTPUT_DIR = Path("test_pollinations_output") | |
| # Motion prompts to test | |
| MOTION_PROMPTS = [ | |
| ( | |
| "character_turn", | |
| "character slowly turns head, wind blowing through hair, dramatic camera zoom in, cinematic", | |
| ), | |
| ( | |
| "breathing_idle", | |
| "subtle breathing motion, hair gently swaying, atmospheric particles floating, slight camera drift", | |
| ), | |
| ] | |
| async def upload_image(file_path: str) -> str: | |
| """Upload an image to media.pollinations.ai and return the URL.""" | |
| path = Path(file_path) | |
| if not path.exists(): | |
| raise FileNotFoundError(f"Image not found: {file_path}") | |
| print(f" Uploading {path.name} to media.pollinations.ai...") | |
| async with httpx.AsyncClient(timeout=120.0) as client: | |
| with open(file_path, "rb") as f: | |
| files = {"file": (path.name, f, "image/png")} | |
| resp = await client.post( | |
| f"{MEDIA_URL}/upload", | |
| files=files, | |
| headers={"Authorization": f"Bearer {API_KEY}"}, | |
| ) | |
| print(f" Upload status: {resp.status_code}") | |
| print(f" Upload response: {resp.text[:500]}") | |
| resp.raise_for_status() | |
| # Try to extract URL from response | |
| content_type = resp.headers.get("content-type", "") | |
| if "json" in content_type: | |
| data = resp.json() | |
| url = ( | |
| data.get("url") | |
| or data.get("image_url") | |
| or data.get("media_url") | |
| or data.get("link", "") | |
| ) | |
| if url: | |
| return url | |
| text = resp.text.strip() | |
| if text.startswith("http"): | |
| return text | |
| raise ValueError(f"Could not extract URL: {resp.text[:200]}") | |
| async def generate_video( | |
| prompt: str, | |
| output_path: Path, | |
| image_url: str | None = None, | |
| duration: int = 5, | |
| ) -> dict: | |
| """Generate a video via Pollinations grok-video.""" | |
| import urllib.parse | |
| encoded_prompt = urllib.parse.quote(prompt, safe="") | |
| url = f"{BASE_URL}/video/{encoded_prompt}" | |
| params = { | |
| "model": "grok-video", | |
| "duration": duration, | |
| "aspectRatio": "16:9", | |
| "nologo": "true", | |
| "key": API_KEY, | |
| } | |
| if image_url: | |
| params["image"] = image_url | |
| start = time.time() | |
| try: | |
| async with httpx.AsyncClient(timeout=300.0, follow_redirects=True) as client: | |
| print(f" Requesting video ({duration}s)...") | |
| resp = await client.get( | |
| url, | |
| params=params, | |
| headers={"Authorization": f"Bearer {API_KEY}"}, | |
| ) | |
| elapsed = time.time() - start | |
| if resp.status_code != 200: | |
| return { | |
| "status": "error", | |
| "error": f"HTTP {resp.status_code}: {resp.text[:300]}", | |
| "time": elapsed, | |
| } | |
| if len(resp.content) < 5000: | |
| return { | |
| "status": "error", | |
| "error": f"Response too small ({len(resp.content)} bytes): {resp.text[:200]}", | |
| "time": elapsed, | |
| } | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| with open(str(output_path), "wb") as f: | |
| f.write(resp.content) | |
| return { | |
| "status": "ok", | |
| "file_size": len(resp.content), | |
| "time": elapsed, | |
| "path": str(output_path), | |
| } | |
| except Exception as e: | |
| elapsed = time.time() - start | |
| return { | |
| "status": "error", | |
| "error": str(e), | |
| "time": elapsed, | |
| } | |
| async def main(): | |
| OUTPUT_DIR.mkdir(parents=True, exist_ok=True) | |
| # Determine source image | |
| if len(sys.argv) > 1: | |
| image_path = sys.argv[1] | |
| else: | |
| # Try Phase 1 outputs in order of expected quality | |
| candidates = [ | |
| OUTPUT_DIR / "imagen-4.png", | |
| OUTPUT_DIR / "grok-imagine.png", | |
| OUTPUT_DIR / "klein-large.png", | |
| OUTPUT_DIR / "klein.png", | |
| OUTPUT_DIR / "flux.png", | |
| ] | |
| image_path = None | |
| for c in candidates: | |
| if c.exists(): | |
| image_path = str(c) | |
| break | |
| if not image_path: | |
| print("ERROR: No source image found. Run test_pollinations_images.py first,") | |
| print(" or provide an image path: python test_pollinations_video.py image.png") | |
| return | |
| print("=" * 70) | |
| print("POLLINATIONS VIDEO MODEL TEST (img2vid)") | |
| print(f"Source image: {image_path}") | |
| print(f"Output: {OUTPUT_DIR}/") | |
| print("=" * 70) | |
| # Step 1: Upload image | |
| print("\n[1] UPLOADING SOURCE IMAGE") | |
| try: | |
| image_url = await upload_image(image_path) | |
| print(f" Upload URL: {image_url}") | |
| except Exception as e: | |
| print(f" Upload FAILED: {e}") | |
| print("\n Trying without upload (text-only video generation)...") | |
| image_url = None | |
| # Step 2: Generate videos with different motion prompts | |
| total_cost = 0.0 | |
| results = [] | |
| for tag, prompt in MOTION_PROMPTS: | |
| print(f"\n[2] VIDEO: {tag}") | |
| print(f" Prompt: {prompt[:80]}...") | |
| output_path = OUTPUT_DIR / f"video_{tag}.mp4" | |
| if image_url: | |
| result = await generate_video(prompt, output_path, image_url=image_url, duration=5) | |
| else: | |
| result = await generate_video(prompt, output_path, duration=5) | |
| results.append((tag, result)) | |
| total_cost += 5 * 0.003 # 5 seconds × $0.003/sec | |
| if result["status"] == "ok": | |
| size_mb = result["file_size"] / (1024 * 1024) | |
| print(f" OK — {size_mb:.1f}MB, {result['time']:.1f}s generation time") | |
| print(f" Saved: {result['path']}") | |
| else: | |
| print(f" FAILED — {result['error']}") | |
| # Step 3: Test text-only video (no reference image) for comparison | |
| print(f"\n[3] VIDEO: text_only (no reference image)") | |
| text_prompt = ( | |
| "anime style, young man with black hair and red eyes in military coat, " | |
| "slowly turns head, dramatic lighting, dark throne room, cinematic camera movement" | |
| ) | |
| output_path = OUTPUT_DIR / "video_text_only.mp4" | |
| result = await generate_video(text_prompt, output_path, duration=5) | |
| results.append(("text_only", result)) | |
| total_cost += 5 * 0.003 | |
| if result["status"] == "ok": | |
| size_mb = result["file_size"] / (1024 * 1024) | |
| print(f" OK — {size_mb:.1f}MB, {result['time']:.1f}s generation time") | |
| else: | |
| print(f" FAILED — {result['error']}") | |
| # Summary | |
| print("\n" + "=" * 70) | |
| print("SUMMARY") | |
| print("=" * 70) | |
| print(f"{'Test':<20} {'Status':<8} {'Time':>8} {'Size':>10}") | |
| print("-" * 50) | |
| for tag, r in results: | |
| status = r["status"] | |
| t = f"{r['time']:.1f}s" | |
| size = f"{r.get('file_size', 0) // 1024}KB" if r["status"] == "ok" else "—" | |
| print(f"{tag:<20} {status:<8} {t:>8} {size:>10}") | |
| print(f"\nTotal pollen spent: {total_cost:.4f}") | |
| print(f"\nIMPORTANT: Watch the videos to evaluate:") | |
| print(" 1. Does img2vid maintain the character's appearance?") | |
| print(" 2. Is the motion natural and smooth?") | |
| print(" 3. Is quality acceptable for anime-style content?") | |
| print(" 4. Compare img2vid vs text-only — which is better?") | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |