Export your data from Visual Layer to use in external tools or workflows. You can export the entire dataset, a filtered subset, or just specific items—depending on your needs.
Use this file to discover all available pages before exploring further.
How This Helps
Export datasets for use in training pipelines, labeling platforms, analytics workflows, or external review. You can export everything or just what you need.
We’ll use the following class to encapsulate the API calls:
import timefrom typing import Any, Dictimport requestsclass DatasetExporter: def __init__(self, base_url: str): self.base_url = base_url.rstrip("/") self.headers = {"Accept": "application/json, text/plain, */*"} def initiate_export( self, dataset_id: str, file_name: str, export_format: str = "json", include_images: bool = False, ) -> Dict[str, Any]: """ Initiate an export of a dataset. Args: dataset_id: The ID of the dataset to export file_name: Name of the export file export_format: Format of the export (default: 'json') include_images: Whether to include images in export (default: False) Returns: Dict containing the export task information """ url = f"{self.base_url}/api/v1/dataset/{dataset_id}/export_context_async" params = { "file_name": file_name, "export_format": export_format, "include_images": str(include_images).lower(), } response = requests.get(url, headers=self.headers, params=params) response.raise_for_status() return response.json() def check_export_status( self, dataset_id: str, export_task_id: str ) -> Dict[str, Any]: """ Check the status of an export task. Args: dataset_id: The ID of the dataset export_task_id: The ID of the export task to check Returns: Dict containing the status information """ url = f"{self.base_url}/api/v1/dataset/{dataset_id}/export_status" params = {"export_task_id": export_task_id} response = requests.get(url, headers=self.headers, params=params) response.raise_for_status() return response.json() def wait_for_export( self, dataset_id: str, export_task_id: str, check_interval: int = 5, timeout: int = 300, ) -> Dict[str, Any]: """ Wait for an export task to complete. Args: dataset_id: The ID of the dataset export_task_id: The ID of the export task to check check_interval: Time in seconds between status checks (default: 5) timeout: Maximum time to wait in seconds (default: 300) Returns: Dict containing the final status information Raises: TimeoutError: If the export doesn't complete within the timeout period """ start_time = time.time() while True: status = self.check_export_status(dataset_id, export_task_id) if status["status"] == "COMPLETED": return status if status["status"] == "FAILED": raise Exception(f"Export failed: {status.get('result_message')}") if time.time() - start_time > timeout: raise TimeoutError("Export timed out") time.sleep(check_interval) def download_export(self, download_uri: str, output_path: str) -> None: """ Download the exported file from the given URI. Args: download_uri: The URI path from the export status output_path: Local path where the file should be saved Returns: None """ url = f"{self.base_url}{download_uri}" response = requests.get(url, headers=self.headers, stream=True) response.raise_for_status() with open(output_path, "wb") as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk)
First, let’s load the exported metadata into a pandas dataframe:
import pandas as pdimport jsonwith open("food101/metadata.json") as f: data = json.load(f)# This is the dataset level informationinfo_df = pd.DataFrame([data["info"]])# This is the image level informationmedia_items_df = pd.json_normalize(data["media_items"])
The metadata_items column contains a list of issues for each image. We can filter for images with duplicate issues above a certain confidence threshold:
def has_duplicate_issue(metadata_items, confidence_threshold=0.8): if not isinstance(metadata_items, list): return False for item in metadata_items: if ( item.get("type") == "issue" and item.get("properties", {}).get("issue_type") == "duplicates" and item.get("properties", {}).get("confidence", 0) > confidence_threshold ): return True return False# Replace with your confidence thresholdCONFIDENCE_THRESHOLD = 0.8duplicate_df = media_items_df[ media_items_df["metadata_items"].apply( lambda x: has_duplicate_issue(x, confidence_threshold=CONFIDENCE_THRESHOLD) )]duplicate_df
This table includes images with duplicate issues above a confidence threshold of 0.8.
import json import pandas as pd import re# Load JSON filewith open("metadata 8.json", "r") as f: data = json.load(f)# Prepare list for DataFramerecords = []# Iterate over media itemsfor item in data.get("media_items", []): video_name = None frame_timestamp = None categories = [] file_name = item.get("file_name", "") for metadata in item.get("metadata_items", []): if metadata["type"] == "video_info": video_name = metadata["properties"].get("video_name") frame_timestamp = metadata["properties"].get("frame_timestamp") elif metadata["type"] == "image_label": categories.append(metadata["properties"].get("category_name")) # Extract frame number from file name match = re.search(r'output_(\d+).jpg', file_name) frame_number = int(match.group(1)) if match else None # Append record with categories as a list records.append({ "video_name": video_name, "file_name": file_name, "frame_number": frame_number, "time_in_video": frame_timestamp, "categories": categories })# Convert to DataFramedf = pd.DataFrame(records) print(df.tail())
For datasets with video frames, you can analyze similarity between frames to understand which videos are most similar. This is useful for identifying duplicate content, finding related videos, or grouping similar content.
Now let’s create a table that shows video-to-video similarity. This analysis only compares frames between different videos, excluding similarities within the same video:
# Create a mapping of frames to their duplicate groupsframe_to_groups = defaultdict(set)group_to_frames = defaultdict(set)for idx, row in frames_df.iterrows(): media_id = row['media_id'] video_name = row['video_name'] for similar_frame in row['similar_frames']: group_id = similar_frame['group_id'] confidence = similar_frame['confidence'] # Only consider high-confidence similarities (>= 0.95) if confidence >= 0.95: frame_to_groups[media_id].add(group_id) group_to_frames[group_id].add((media_id, video_name, confidence))# Create video-to-video similarity matrixvideo_similarity_records = []# Get all unique videosall_videos = frames_df['video_name'].unique()for video_a in all_videos: for video_b in all_videos: if video_a != video_b: # Get frames from both videos frames_a = frames_df[frames_df['video_name'] == video_a]['media_id'].tolist() frames_b = frames_df[frames_df['video_name'] == video_b]['media_id'].tolist() # Find similarities between frames from different videos inter_video_similarities = [] shared_groups = set() for frame_a in frames_a: for frame_b in frames_b: groups_a = frame_to_groups.get(frame_a, set()) groups_b = frame_to_groups.get(frame_b, set()) # Find groups that contain both frames (cross-video similarity) common_groups = groups_a.intersection(groups_b) if common_groups: shared_groups.update(common_groups) # For each shared group, get the similarity confidence # We know these frames are from different videos by construction for group_id in common_groups: frames_in_group = group_to_frames[group_id] # Get confidence scores for frames from both videos in this group confidences_in_group = [conf for _, _, conf in frames_in_group] if confidences_in_group: # Use the average confidence for this similarity group inter_video_similarities.append(sum(confidences_in_group) / len(confidences_in_group)) if inter_video_similarities: avg_similarity = sum(inter_video_similarities) / len(inter_video_similarities) num_similar_frames = len(shared_groups) video_similarity_records.append({ "video_a": video_a, "video_b": video_b, "average_similarity": round(avg_similarity, 4), "number_of_similar_frames": num_similar_frames })# Create the final similarity DataFramesimilarity_df = pd.DataFrame(video_similarity_records)# Remove duplicates (keep only one direction of each pair)similarity_df = similarity_df[similarity_df['video_a'] < similarity_df['video_b']]# Sort by average similarity (descending)similarity_df = similarity_df.sort_values('average_similarity', ascending=False)print(f"Video pairs with similarities: {len(similarity_df)}")print("\nTop 10 most similar video pairs:")print(similarity_df.head(10))
You can also use the cluster_id field to find similar frames:
# Create similarity table using cluster IDscluster_similarity_records = []# Group frames by clustercluster_groups = frames_df.groupby('cluster_id')for cluster_id, cluster_frames in cluster_groups: if len(cluster_frames) > 1: # Only clusters with multiple frames videos_in_cluster = cluster_frames['video_name'].unique() # Create pairs of videos in the same cluster for i, video_a in enumerate(videos_in_cluster): for video_b in videos_in_cluster[i+1:]: frames_a_in_cluster = len(cluster_frames[cluster_frames['video_name'] == video_a]) frames_b_in_cluster = len(cluster_frames[cluster_frames['video_name'] == video_b]) cluster_similarity_records.append({ "video_a": video_a, "video_b": video_b, "cluster_id": cluster_id, "frames_from_video_a": frames_a_in_cluster, "frames_from_video_b": frames_b_in_cluster, "total_frames_in_cluster": len(cluster_frames) })# Create DataFramecluster_similarity_df = pd.DataFrame(cluster_similarity_records)# Aggregate by video pairvideo_cluster_summary = cluster_similarity_df.groupby(['video_a', 'video_b']).agg({ 'cluster_id': 'count', # Number of shared clusters 'frames_from_video_a': 'sum', 'frames_from_video_b': 'sum'}).rename(columns={'cluster_id': 'shared_clusters'}).reset_index()print("\nVideo similarity based on shared clusters:")print(video_cluster_summary.sort_values('shared_clusters', ascending=False).head())
For datasets with mislabel detection, you can analyze which labels are potentially incorrect and what the suggested corrections are. This is valuable for improving dataset quality and understanding systematic labeling issues.