computer-visiondatasetcocosqlitedata-engineering

COCO 형식 대용량 Annotation JSON을 SQLite DB로 변환하기

3 min read

소스코드

배경

COCO 형식의 annotation JSON 파일은 규모가 작은 경우 필요할 때마다 JSON을 직접 로드하여 처리해도 문제가 되지 않는다. 그러나 annotation 파일 하나의 크기가 수 GB 단위에 이르는 경우 매번 JSON 파일을 로드하여 필요한 subset에 해당하는 annotation 데이터를 추출하는 방식은 다음과 같은 문제를 야기한다.

  • JSON 전체 로딩으로 인한 과도한 메모리 사용
  • 필요한 subset 추출 시 매번 수 분 이상의 대기 시간 발생
  • 반복적인 전처리 작업으로 인한 생산성 저하

실제로 업무 중 Objects365 데이터셋에서 매 서브셋 폴더마다 특정 클래스에 해당하는 데이터만 추출하는 작업이 필요했는데, annotation JSON이 5.5GB 크기의 단일 파일로 제공되어 파싱 및 필터링에 상당한 시간이 소요되었다.

이에 따라 annotation JSON을 SQLite DB로 변환한 뒤 필요한 조건에 따라 SQL로 빠르게 subset을 추출하는 방식으로 파이프라인을 개선하였다.

기존 접근 방식의 한계

현업에서는 pycocotools 를 이용하여 JSON을 로드한 뒤, 전체 annotation을 메모리에 적재하는 방식을 사용하였다. 그러나 개인 개발 환경에서 JSON을 로드하는 과정에서 메모리 부족 문제가 발생하였다.

  • COCO JSON 전체를 메모리에 로드하는 방식은 대용량 데이터셋에 부적합함
  • Streaming 방식의 파싱을 사용하여 메모리 사용량을 줄여야 함

개선: ijson 기반 스트리밍 처리

대용량 JSON 파일을 메모리에 모두 적재하지 않기 위해 ijson 라이브러리를 이용하여 annotation, images, categories 항목을 스트리밍 방식으로 파싱하도록 구조를 변경하였다.

이를 통해:

  • JSON 파일 전체 로드 없이 순차 처리
  • 메모리 사용량 대폭 감소
  • 대용량 annotation 파일 처리 안정성 확보

구현은 다음과 같은 단계로 구성하였다.

  1. categories 스트리밍 파싱 및 INSERT
  2. images 스트리밍 파싱 및 INSERT
  3. annotations 스트리밍 파싱 및 batch INSERT
def build_coco_database_streaming(json_path: Path, db_path: Path):
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()

    create_tables(cursor)
    conn.commit()

    print("[1/3] Streaming categories...")
    stream_categories(json_path, cursor, conn)

    print("[2/3] Streaming images...")
    stream_images(json_path, cursor, conn)

    print("[3/3] Streaming annotations...")
    stream_annotations(json_path, cursor, conn)

    print("Creating indexes...")
    create_indexes(cursor, conn)

    conn.close()
    print("Done.")

Batch INSERT 및 Decimal 처리 이슈

annotations 항목은 전체 데이터셋 기준으로 수천만 건에 이를 수 있기 때문에 단건 INSERT 방식은 시간 효율이 낮을 수 있다. 이에 따라 일정 개수 단위로 batch를 구성하여 executemany 를 이용한 batch INSERT 방식으로 처리하였다.

또한 ijson 파성 과정에서 Decimal 타입이 포함되어 SQLite 바인딩 및 JSON 직렬화 과정에서 오류가 발생하는 문제가 있었으며, 이를 다음과 같은 방식으로 처리하였다.

  • Decimalfloat / int 로 명시적 변환
  • batch INSERT 이전에 타입 정규화 수행
def normalize_json_numbers(obj):
    """
    Recursively convert Decimal to float for JSON serialization.
    """
    if isinstance(obj, Decimal):
        return float(obj)
    elif isinstance(obj, list):
        return [normalize_json_numbers(x) for x in obj]
    elif isinstance(obj, dict):
        return {k: normalize_json_numbers(v) for k, v in obj.items()}
    else:
        return obj
def stream_annotations(json_path, cursor, conn):
    batch = []
    with open(json_path, "rb") as f:
        for ann in tqdm(ijson.items(f, "annotations.item"), desc="Annotations"):
            bbox = normalize_json_numbers(ann.get("bbox", []))
            segmentation = normalize_json_numbers(ann.get("segmentation", []))

            area = ann.get("area", 0.0)
            iscrowd = ann.get("iscrowd", 0)

            batch.append(
                (
                    int(ann["id"]),
                    int(ann["image_id"]),
                    int(ann["category_id"]),
                    json.dumps(bbox),
                    float(area),
                    int(iscrowd),
                    json.dumps(segmentation),
                )
            )

            if len(batch) >= BATCH_SIZE:
                insert_batch(
                    cursor,
                    """
                    INSERT INTO annotations 
                    (id, image_id, category_id, bbox, area, iscrowd, segmentation)
                    VALUES (?, ?, ?, ?, ?, ?, ?)
                    """,
                    batch,
                )
                conn.commit()
                batch.clear()

이를 통해:

  • 대량 INSERT 성능 개선
  • SQLite 바인딩 오류 방지
  • JSON 직렬화 오류 방지

와 같은 효과를 얻을 수 있었다.

데이터 적재 후 인덱스 생성

annotation 테이블은 이후 다음과 같은 쿼리에 반복적으로 사용될 예정이었다.

  • 특정 image_id에 대한 annotation 조회
  • 특정 category_id에 대한 필터링
  • 서브셋 폴더 내 이미지 목록과 JOIN

이에 따라 데이터 적재 완료 이후, 다음 컬럼에 대해 인덱스를 생성하였다.

  • annotations.image_id
  • annotations.category_id

대량 INSERT시 인덱스 유지 비용으로 인해 적재 성능이 저하되는 것을 방지하기 위하여 모든 데이터의 적재가 완료된 후 인덱스를 생성하도록 구성하였다.

def create_indexes(cursor, conn):
    cursor.execute("""
        CREATE INDEX IF NOT EXISTS idx_annotations_image_id
        ON annotations(image_id)
    """)
    cursor.execute("""
        CREATE INDEX IF NOT EXISTS idx_annotations_category_id
        ON annotations(category_id)
    """)
    conn.commit()

결과

SQLite DB로 변환한 후:

  • 특정 서브셋에 해당하는 이미지들의 annotation 추출
  • 특정 클래스만 포함하는 이미지 필터링

과 같은 작업을 수 초 내에 처리할 수 있게 되었다.