API Kernel: HeapTopKCore Mechanism: Maintain a bounded collection of elements with efficient access to extreme values (min/max).
This document presents the canonical heap templates for top-k selection, streaming median, k-way merge, and greedy scheduling problems. Each implementation follows consistent naming conventions and includes detailed logic explanations.
A heap is a complete binary tree satisfying the heap property: - Min-Heap: Parent β€ children (root is minimum) - Max-Heap: Parent β₯ children (root is maximum)
defheap_top_k(elements:Iterable[T],k:int,key:Callable=None)->List[T]:""" Find top-k elements efficiently. Strategy: Maintain a min-heap of size k. - Elements larger than heap root replace the root - After processing, heap contains k largest elements Time: O(n log k), Space: O(k) Why min-heap for max problem? - Min-heap root is the smallest of the k largest - Easy to check if new element should enter the set - If new_element > root, it belongs in top-k; evict root """min_heap=[]forelementinelements:val=key(element)ifkeyelseelementiflen(min_heap)<k:heapq.heappush(min_heap,(val,element))elifval>min_heap[0][0]:heapq.heapreplace(min_heap,(val,element))return[item[1]foriteminmin_heap]
2. Base Template: Kth Largest Element (LeetCode 215)¶
Problem: Find the kth largest element in an unsorted array. Invariant: Min-heap of size k contains the k largest elements; root is the kth largest. Role: BASE TEMPLATE for HeapTopK API Kernel.
# Pattern: heap_kth_element# See: docs/patterns/heap/templates.md Section 1 (Base Template)classSolutionHeap:deffindKthLargest(self,nums:List[int],k:int)->int:""" Find kth largest using min-heap of size k. Key Insight: - Maintain k largest elements seen so far in a min-heap - Root of min-heap = smallest of the k largest = kth largest Why min-heap for max problem? - When new element > root, it belongs in top-k - Replace root (smallest of k largest) with new element - After n elements, root is exactly the kth largest """importheapq# Min-heap stores k largest elementsmin_heap:list[int]=[]fornuminnums:iflen(min_heap)<k:# Phase 1: Fill heap until size kheapq.heappush(min_heap,num)elifnum>min_heap[0]:# Phase 2: New element larger than kth largest# Replace the current kth largestheapq.heapreplace(min_heap,num)# Root is the kth largest elementreturnmin_heap[0]
classSolutionQuickselect:deffindKthLargest(self,nums:List[int],k:int)->int:""" Quickselect algorithm: O(n) average, O(nΒ²) worst. Partition-based selection without full sorting. Use random pivot to avoid worst-case on sorted input. """importrandomtarget_idx=k-1# kth largest at index k-1 in descending orderdefpartition(left:int,right:int)->int:# Random pivot to avoid O(nΒ²) on sorted arrayspivot_idx=random.randint(left,right)pivot_val=nums[pivot_idx]nums[pivot_idx],nums[right]=nums[right],nums[pivot_idx]store_idx=leftforiinrange(left,right):ifnums[i]>=pivot_val:# >= for descending ordernums[store_idx],nums[i]=nums[i],nums[store_idx]store_idx+=1nums[store_idx],nums[right]=nums[right],nums[store_idx]returnstore_idxleft,right=0,len(nums)-1whileleft<=right:pivot_idx=partition(left,right)ifpivot_idx==target_idx:returnnums[pivot_idx]elifpivot_idx<target_idx:left=pivot_idx+1else:right=pivot_idx-1returnnums[left]
Problem: Given an integer array, return the k most frequent elements. Variant: heap_top_k with frequency as sort key Delta from 215: Sort by frequency instead of value; bucket sort alternative.
# Pattern: heap_top_k# See: docs/patterns/heap/templates.md Section 2fromcollectionsimportCounterimportheapqclassSolutionHeap:deftopKFrequent(self,nums:List[int],k:int)->List[int]:""" Find k most frequent elements using min-heap. Two-Phase Approach: 1. Count frequencies: O(n) 2. Top-k selection: O(m log k) where m = unique elements Why min-heap for max-frequency? - Store (frequency, element) tuples - Heap root = smallest frequency among top-k - Elements with higher frequency replace root """# Phase 1: Count frequenciesfrequency_map:dict[int,int]=Counter(nums)# Phase 2: Maintain min-heap of size kmin_heap:list[tuple[int,int]]=[]# (frequency, element)forelement,freqinfrequency_map.items():iflen(min_heap)<k:heapq.heappush(min_heap,(freq,element))eliffreq>min_heap[0][0]:heapq.heapreplace(min_heap,(freq,element))# Extract elements (frequency is index 0, element is index 1)return[elementforfreq,elementinmin_heap]
classSolutionBucket:deftopKFrequent(self,nums:List[int],k:int)->List[int]:""" Bucket sort approach: O(n) time. Key Insight: - Frequency is bounded: 1 <= freq <= n - Use array index as frequency bucket - Iterate from highest frequency bucket Why O(n)? - At most n distinct elements - At most n buckets (frequency 1 to n) - Single pass through buckets """frequency_map:dict[int,int]=Counter(nums)# Bucket: index = frequency, value = list of elements with that frequency# Frequency can be at most len(nums)buckets:list[list[int]]=[[]for_inrange(len(nums)+1)]forelement,freqinfrequency_map.items():buckets[freq].append(element)# Collect k elements starting from highest frequencyresult:list[int]=[]forfreqinrange(len(nums),0,-1):forelementinbuckets[freq]:result.append(element)iflen(result)==k:returnresultreturnresult
Problem: Design a data structure that supports adding numbers and finding the median. Pattern: Two heaps (max-heap for lower half, min-heap for upper half) Role: BASE TEMPLATE for streaming median problems.
# Pattern: heap_median_stream# See: docs/patterns/heap/templates.md Section 3importheapqclassMedianFinder:""" Maintain median of a data stream using two heaps. Data Structure Design: - lower_half (max-heap): stores smaller half of numbers - upper_half (min-heap): stores larger half of numbers Invariants: 1. All elements in lower_half <= all elements in upper_half 2. Size difference: |lower_half| - |upper_half| <= 1 3. If odd count, lower_half has one more element Why Two Heaps? - Median requires knowing the middle element(s) - Heaps give O(1) access to extreme values - Max-heap for lower half β O(1) access to largest of smaller numbers - Min-heap for upper half β O(1) access to smallest of larger numbers """def__init__(self):# Max-heap for lower half (store negatives for max-heap behavior)self.lower_half:list[int]=[]# max-heap via negation# Min-heap for upper halfself.upper_half:list[int]=[]# min-heapdefaddNum(self,num:int)->None:""" Add number while maintaining heap invariants. Strategy: 1. Always add to lower_half first (max-heap) 2. Move largest from lower to upper (balances the halves) 3. If upper becomes larger, move smallest back to lower This ensures: - Elements are correctly partitioned (lower <= upper) - Sizes are balanced (lower has equal or one more) """# Step 1: Add to lower half (max-heap, store negative)heapq.heappush(self.lower_half,-num)# Step 2: Move largest from lower to upper# This ensures lower_half max <= upper_half minlargest_lower=-heapq.heappop(self.lower_half)heapq.heappush(self.upper_half,largest_lower)# Step 3: Rebalance if upper has more elementsiflen(self.upper_half)>len(self.lower_half):smallest_upper=heapq.heappop(self.upper_half)heapq.heappush(self.lower_half,-smallest_upper)deffindMedian(self)->float:""" Return median in O(1) time. Cases: - Odd count: median is the root of lower_half (the extra element) - Even count: median is average of both roots """iflen(self.lower_half)>len(self.upper_half):# Odd count: lower has one morereturnfloat(-self.lower_half[0])else:# Even count: average of two middle elementsreturn(-self.lower_half[0]+self.upper_half[0])/2.0
# Pattern: merge_k_sorted_heap# See: docs/patterns/heap/templates.md Section 4importheapqfromtypingimportList,OptionalclassListNode:def__init__(self,val:int=0,next:'ListNode'=None):self.val=valself.next=nextclassSolutionHeap:defmergeKLists(self,lists:List[Optional[ListNode]])->Optional[ListNode]:""" Merge k sorted lists using min-heap. Key Insight: - At any point, the next element in the merged list is the smallest among all current list heads - Min-heap gives O(log k) access to the smallest head - After popping, push the successor node Why (val, index, node) tuple? - val: primary sort key - index: tie-breaker (ListNode not comparable) - node: reference to actual node for result building """ifnotlists:returnNone# Min-heap: (node_value, list_index, node)# list_index breaks ties when values are equalmin_heap:list[tuple[int,int,ListNode]]=[]# Initialize heap with all non-empty list headsfori,nodeinenumerate(lists):ifnode:heapq.heappush(min_heap,(node.val,i,node))# Dummy head simplifies list constructiondummy=ListNode(0)tail=dummywhilemin_heap:# Pop the smallest nodeval,i,node=heapq.heappop(min_heap)# Append to resulttail.next=nodetail=tail.next# Push successor if existsifnode.next:heapq.heappush(min_heap,(node.next.val,i,node.next))returndummy.next
Problem: Given an array of meeting time intervals, find the minimum number of conference rooms required. Pattern: Greedy assignment with min-heap of end times Variant: Interval scheduling with heap
# Pattern: heap_interval_scheduling# See: docs/patterns/heap/templates.md Section 5importheapqclassSolutionHeap:defminMeetingRooms(self,intervals:List[List[int]])->int:""" Find minimum meeting rooms using min-heap of end times. Key Insight: - Sort meetings by start time (process chronologically) - Min-heap tracks when each room becomes free (end times) - For each meeting: if starts after earliest end, reuse room - Otherwise, allocate new room Why min-heap of end times? - We need the room that frees up earliest - If new meeting starts >= earliest end, room can be reused - Pop the old end time, push new end time """ifnotintervals:return0# Sort by start timeintervals.sort(key=lambdax:x[0])# Min-heap of end times (when rooms become free)end_times:list[int]=[]forstart,endinintervals:# Check if earliest-ending room is now freeifend_timesandend_times[0]<=start:# Room is free, reuse it (pop old end, push new end)heapq.heapreplace(end_times,end)else:# All rooms busy, allocate new roomheapq.heappush(end_times,end)# Number of rooms = size of heapreturnlen(end_times)
classSolutionSweepLine:defminMeetingRooms(self,intervals:List[List[int]])->int:""" Sweep line: track events at each time point. Events: - +1 at start time (meeting begins) - -1 at end time (meeting ends) Max concurrent events = max rooms needed. """events=[]forstart,endinintervals:events.append((start,1))# Meeting startsevents.append((end,-1))# Meeting ends# Sort by time; if same time, process ends before starts# (room frees up before new meeting uses it)events.sort(key=lambdax:(x[0],x[1]))max_rooms=0current_rooms=0fortime,deltainevents:current_rooms+=deltamax_rooms=max(max_rooms,current_rooms)returnmax_rooms
Problem: Schedule tasks with cooldown constraint n between same tasks. Return minimum time units. Pattern: Greedy scheduling with max-heap Variant: Heap + greedy for optimal ordering
Tasks: A A A B B B, n = 2
Greedy: Always schedule most frequent available task
Optimal schedule:
A B _ A B _ A B
1 2 3 4 5 6 7 8
Total time = 8 units
Why most frequent first?
- Reduces idle time by distributing high-frequency tasks
- Idle slots appear when no task is available (all in cooldown)
# Pattern: heap_task_scheduler# See: docs/patterns/heap/templates.md Section 6importheapqfromcollectionsimportCounter,dequeclassSolutionHeap:defleastInterval(self,tasks:List[str],n:int)->int:""" Schedule tasks with cooldown using max-heap. Strategy: 1. Greedily pick most frequent available task 2. Track cooldown using a queue (task becomes available at time t) 3. If no task available, idle (time still advances) Why max-heap? - We want to process high-frequency tasks first - This minimizes idle time at the end - Max-heap gives O(log k) access to most frequent """# Count task frequenciestask_counts=Counter(tasks)# Max-heap of remaining counts (negate for max-heap)max_heap=[-countforcountintask_counts.values()]heapq.heapify(max_heap)# Queue of (available_time, remaining_count) for tasks in cooldowncooldown_queue:deque[tuple[int,int]]=deque()time=0whilemax_heaporcooldown_queue:time+=1# Check if any task exits cooldownifcooldown_queueandcooldown_queue[0][0]==time:available_time,remaining=cooldown_queue.popleft()heapq.heappush(max_heap,-remaining)ifmax_heap:# Execute most frequent available taskcount=-heapq.heappop(max_heap)count-=1ifcount>0:# Task has more instances, put in cooldowncooldown_queue.append((time+n+1,count))# else: idle (no task available)returntime
Input: tasks = ['A','A','A','B','B','B'], n = 2
Initial:
Counts: A=3, B=3
max_heap: [-3, -3]
cooldown_queue: []
Time 1: Pop A (count=3β2), cooldown until time=4
heap: [-3], queue: [(4, 2)]
Time 2: Pop B (count=3β2), cooldown until time=5
heap: [], queue: [(4, 2), (5, 2)]
Time 3: Idle (heap empty, queue not ready)
heap: [], queue: [(4, 2), (5, 2)]
Time 4: A exits cooldown, pop A (count=2β1)
heap: [-2], queue: [(5, 2), (7, 1)]
Actually: A exits, push to heap, then pop
... continue ...
Final time = 8
classSolutionMath:defleastInterval(self,tasks:List[str],n:int)->int:""" Mathematical approach based on most frequent task. Observation: - Most frequent task determines the "frame" - Frame: chunks of size (n+1) with one instance of max-freq task Formula: - frames = (max_freq - 1) - frame_size = n + 1 - base_time = frames * frame_size - Add tasks with max_freq (they fill the last partial frame) Edge case: - If total tasks > formula result, no idle needed """task_counts=Counter(tasks)max_freq=max(task_counts.values())max_freq_count=sum(1forcountintask_counts.values()ifcount==max_freq)# Formula: (max_freq - 1) frames Γ (n + 1) slots + max_freq_count final slotsformula_time=(max_freq-1)*(n+1)+max_freq_count# Actual time is max of formula and total tasksreturnmax(formula_time,len(tasks))
tasks = [A,A,A,A,B,B,B,C,C], n = 2, max_freq = 4 (task A)
Formula visualization:
Frame 1: A _ _
Frame 2: A _ _
Frame 3: A _ _
Frame 4: A
Slots per frame = n + 1 = 3
Frames (excluding last) = max_freq - 1 = 3
Base slots = 3 Γ 3 = 9
Plus final A = 1
Total slots = 10 (minimum, may need more if many tasks)
Problem: Smash two heaviest stones repeatedly, return weight of last stone (or 0). Pattern: Greedy simulation with max-heap Variant: Simple heap operations for repeated selection
# Pattern: heap_greedy_simulation# See: docs/patterns/heap/templates.md Section 7importheapqclassSolutionHeap:deflastStoneWeight(self,stones:List[int])->int:""" Simulate stone smashing using max-heap. Game Rules: 1. Pick two heaviest stones x, y (x <= y) 2. If x == y: both destroyed 3. If x != y: stone of weight y - x remains Why max-heap? - Need repeated access to two largest elements - Python heapq is min-heap, so negate values - O(log n) per operation vs O(n) for linear search """# Convert to max-heap (negate all values)max_heap=[-stoneforstoneinstones]heapq.heapify(max_heap)# O(n) heapify# Simulate until 0 or 1 stone remainswhilelen(max_heap)>1:# Pop two largest (remember to negate)largest=-heapq.heappop(max_heap)second=-heapq.heappop(max_heap)# If different weights, push the differenceiflargest!=second:heapq.heappush(max_heap,-(largest-second))# Return last stone weight (or 0 if no stones left)return-max_heap[0]ifmax_heapelse0
Time breakdown: - Heapify: O(n) - Each smash: 2 pops + 0-1 push = O(log n) - Total smashes: at most n-1 (each reduces count by at least 1) - Total: O(n) + O(n log n) = O(n log n)
fromcollectionsimportCounterimportheapqdeftop_k_frequent(nums:List[int],k:int)->List[int]:"""Find k most frequent elements. Time: O(n + m log k)"""freq=Counter(nums)min_heap=[]forelement,countinfreq.items():iflen(min_heap)<k:heapq.heappush(min_heap,(count,element))elifcount>min_heap[0][0]:heapq.heapreplace(min_heap,(count,element))return[elemfor_,eleminmin_heap]
importheapqdefsimulate_max_heap(items:List[int])->int:"""Process items by repeatedly taking largest. Time: O(n log n)"""max_heap=[-xforxinitems]heapq.heapify(max_heap)whilelen(max_heap)>1:largest=-heapq.heappop(max_heap)second=-heapq.heappop(max_heap)# Process largest and secondresult=process(largest,second)ifresult>0:heapq.heappush(max_heap,-result)return-max_heap[0]ifmax_heapelse0
importheapq# Create heapheap=[]# Empty heapheapq.heapify(list)# Convert list to heap O(n)# Add elementheapq.heappush(heap,val)# Add element O(log n)# Remove smallestsmallest=heapq.heappop(heap)# Remove and return O(log n)# Peek smallestsmallest=heap[0]# View without removing O(1)# Push then pop (optimized)heapq.heappushpop(heap,val)# Push val, pop smallest O(log n)# Pop then push (optimized)heapq.heapreplace(heap,val)# Pop smallest, push val O(log n)# Max-heap using negationmax_heap=[-xforxinitems]heapq.heapify(max_heap)largest=-heapq.heappop(max_heap)
Document generated for NeetCode Practice Framework β API Kernel: HeapTopK