video.py (6539B)
1 """Generate video with animated disks from bass_circles clips. 2 3 Usage: 4 python -m gondry.video projects/bass_circles 5 """ 6 7 import csv 8 import sys 9 from pathlib import Path 10 from PIL import Image, ImageDraw 11 import random 12 import subprocess 13 14 from .parser import parse_header, parse_gondry, VoiceHeader 15 16 17 def generate_video(project_dir: str, width: int = 1920, height: int = 1080): 18 """Generate video with animated disks from clip CSVs.""" 19 project = Path(project_dir) 20 clip_dir = project / "clip" 21 partition_dir = project / "partition" 22 render_dir = project / "render" 23 render_dir.mkdir(exist_ok=True) 24 25 # Parse header and events to get note durations 26 header_path = partition_dir / "header.json" 27 header = parse_header(header_path) 28 bpm = header.bpm 29 30 # Calculate fps from BPM 31 from .fps import bpm_to_fps 32 fps_info = bpm_to_fps(bpm) 33 fps = int(fps_info["fps"]) 34 frames_per_beat = fps_info["frames_per_beat"] 35 36 print(f"BPM: {bpm} | FPS: {fps} | Frames/beat: {frames_per_beat}") 37 38 # Parse all gondry files to get note durations 39 gondry_files = sorted(partition_dir.glob("*.flsp")) 40 note_durations = {} # (start_frame, note) -> duration_frames 41 42 for gondry_path in gondry_files: 43 voice_name = gondry_path.stem 44 channel_config = None 45 for ch in header.channels: 46 if ch["name"] == voice_name: 47 channel_config = ch 48 break 49 if channel_config is None: 50 continue 51 52 voice_header = VoiceHeader( 53 project=header.project, 54 bpm=header.bpm, 55 octave=header.octave, 56 mode=header.mode, 57 channels=header.channels, 58 channel_name=voice_name, 59 channel_config=channel_config, 60 ) 61 62 events = parse_gondry(gondry_path, voice_header) 63 sec_per_beat = 60.0 / bpm 64 sec_per_frame = 1.0 / fps 65 66 for ev in events: 67 if not ev.midi_notes: 68 continue 69 start_sec = ev.beat_position * sec_per_beat 70 duration_sec = ev.duration_beats * sec_per_beat 71 start_frame = int(start_sec / sec_per_frame) 72 duration_frames = int(duration_sec / sec_per_frame) 73 74 for note in ev.midi_notes: 75 note_durations[(start_frame, note)] = duration_frames 76 77 print(f"Note events: {len(note_durations)}") 78 79 # Collect all frames from all clips 80 all_frames = [] 81 for clip_path in sorted(clip_dir.glob("bar_*.csv")): 82 with open(clip_path, 'r') as f: 83 reader = csv.reader(f, delimiter=';') 84 header_row = next(reader) 85 voice_names = header_row[1:] 86 87 for row in reader: 88 frame_data = {} 89 for i, voice_name in enumerate(voice_names): 90 notes_str = row[i + 1] 91 if notes_str: 92 notes = [int(n) for n in notes_str.split(',')] 93 frame_data[voice_name] = notes 94 else: 95 frame_data[voice_name] = [] 96 all_frames.append(frame_data) 97 98 print(f"Total frames: {len(all_frames)}") 99 100 # Track active notes and their properties 101 active_notes = {} # note -> {start_frame, duration, color, x, y} 102 colors = [(255, 0, 0), (0, 255, 0), (0, 0, 255)] # red, green, blue 103 color_idx = 0 104 105 # Disk properties 106 disk_radius = height // 5 107 center_x, center_y = width // 2, height // 2 108 spread = min(width, height) // 4 109 110 # Generate frames 111 frames_dir = render_dir / "frames" 112 frames_dir.mkdir(exist_ok=True) 113 114 for frame_idx, frame_data in enumerate(all_frames): 115 # Create black background 116 img = Image.new('RGB', (width, height), (0, 0, 0)) 117 draw = ImageDraw.Draw(img) 118 119 # Get current notes 120 current_notes = set() 121 for voice_name, notes in frame_data.items(): 122 current_notes.update(notes) 123 124 # Check for new notes 125 for note in current_notes: 126 if note not in active_notes: 127 # Look up actual duration 128 duration = note_durations.get((frame_idx, note), fps) # fallback to 1 sec 129 130 x = center_x + random.randint(-spread, spread) 131 y = center_y + random.randint(-spread, spread) 132 active_notes[note] = { 133 'start_frame': frame_idx, 134 'duration': duration, 135 'color': colors[color_idx % len(colors)], 136 'x': x, 137 'y': y, 138 } 139 color_idx += 1 140 141 # Draw active disks 142 notes_to_remove = [] 143 for note, props in active_notes.items(): 144 elapsed = frame_idx - props['start_frame'] 145 duration = props['duration'] 146 147 if elapsed >= duration: 148 notes_to_remove.append(note) 149 continue 150 151 # Calculate fade (1.0 -> 0.0) 152 fade = 1.0 - (elapsed / duration) 153 154 # Calculate opacity 155 opacity = int(255 * fade) 156 157 # Get color with opacity 158 r, g, b = props['color'] 159 color = (r, g, b, opacity) 160 161 # Draw disk 162 x, y = props['x'], props['y'] 163 draw.ellipse( 164 [x - disk_radius, y - disk_radius, x + disk_radius, y + disk_radius], 165 fill=color 166 ) 167 168 # Remove expired notes 169 for note in notes_to_remove: 170 del active_notes[note] 171 172 # Save frame 173 frame_path = frames_dir / f"frame_{frame_idx:04d}.png" 174 img.save(frame_path) 175 176 if frame_idx % 100 == 0: 177 print(f" Frame {frame_idx}/{len(all_frames)}") 178 179 print(f"\nFrames saved to {frames_dir}") 180 181 # Generate video with ffmpeg 182 video_path = render_dir / "bass_circles.mp4" 183 cmd = [ 184 "ffmpeg", "-y", 185 "-framerate", str(fps), 186 "-i", str(frames_dir / "frame_%04d.png"), 187 "-c:v", "libx264", 188 "-pix_fmt", "yuv420p", 189 "-crf", "23", 190 str(video_path) 191 ] 192 193 print(f"\nGenerating video...") 194 result = subprocess.run(cmd, capture_output=True, text=True) 195 if result.returncode != 0: 196 print(f"FFmpeg error: {result.stderr[-500:]}") 197 return None 198 199 print(f"Video saved to {video_path}") 200 return video_path 201 202 203 if __name__ == "__main__": 204 if len(sys.argv) < 2: 205 print("Usage: python -m gondry.video <project_dir>") 206 sys.exit(1) 207 208 generate_video(sys.argv[1])