COCO 형식 대용량 Annotation JSON을 SQLite DB로 변환하기
배경
COCO 형식의 annotation JSON 파일은 규모가 작은 경우 필요할 때마다 JSON을 직접 로드하여 처리해도 문제가 되지 않는다. 그러나 annotation 파일 하나의 크기가 수 GB 단위에 이르는 경우 매번 JSON 파일을 로드하여 필요한 subset에 해당하는 annotation 데이터를 추출하는 방식은 다음과 같은 문제를 야기한다.
- JSON 전체 로딩으로 인한 과도한 메모리 사용
- 필요한 subset 추출 시 매번 수 분 이상의 대기 시간 발생
- 반복적인 전처리 작업으로 인한 생산성 저하
실제로 업무 중 Objects365 데이터셋에서 매 서브셋 폴더마다 특정 클래스에 해당하는 데이터만 추출하는 작업이 필요했는데, annotation JSON이 5.5GB 크기의 단일 파일로 제공되어 파싱 및 필터링에 상당한 시간이 소요되었다.
이에 따라 annotation JSON을 SQLite DB로 변환한 뒤 필요한 조건에 따라 SQL로 빠르게 subset을 추출하는 방식으로 파이프라인을 개선하였다.
기존 접근 방식의 한계
현업에서는 pycocotools 를 이용하여 JSON을 로드한 뒤, 전체 annotation을 메모리에 적재하는 방식을 사용하였다. 그러나 개인 개발 환경에서 JSON을 로드하는 과정에서 메모리 부족 문제가 발생하였다.
- COCO JSON 전체를 메모리에 로드하는 방식은 대용량 데이터셋에 부적합함
- Streaming 방식의 파싱을 사용하여 메모리 사용량을 줄여야 함
개선: ijson 기반 스트리밍 처리
대용량 JSON 파일을 메모리에 모두 적재하지 않기 위해 ijson 라이브러리를 이용하여 annotation, images, categories 항목을 스트리밍 방식으로 파싱하도록 구조를 변경하였다.
이를 통해:
- JSON 파일 전체 로드 없이 순차 처리
- 메모리 사용량 대폭 감소
- 대용량 annotation 파일 처리 안정성 확보
구현은 다음과 같은 단계로 구성하였다.
- categories 스트리밍 파싱 및 INSERT
- images 스트리밍 파싱 및 INSERT
- annotations 스트리밍 파싱 및 batch INSERT
def build_coco_database_streaming(json_path: Path, db_path: Path):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
create_tables(cursor)
conn.commit()
print("[1/3] Streaming categories...")
stream_categories(json_path, cursor, conn)
print("[2/3] Streaming images...")
stream_images(json_path, cursor, conn)
print("[3/3] Streaming annotations...")
stream_annotations(json_path, cursor, conn)
print("Creating indexes...")
create_indexes(cursor, conn)
conn.close()
print("Done.") Batch INSERT 및 Decimal 처리 이슈
annotations 항목은 전체 데이터셋 기준으로 수천만 건에 이를 수 있기 때문에 단건 INSERT 방식은 시간 효율이 낮을 수 있다. 이에 따라 일정 개수 단위로 batch를 구성하여 executemany 를 이용한 batch INSERT 방식으로 처리하였다.
또한 ijson 파성 과정에서 Decimal 타입이 포함되어 SQLite 바인딩 및 JSON 직렬화 과정에서 오류가 발생하는 문제가 있었으며, 이를 다음과 같은 방식으로 처리하였다.
Decimal→float/int로 명시적 변환- batch INSERT 이전에 타입 정규화 수행
def normalize_json_numbers(obj):
"""
Recursively convert Decimal to float for JSON serialization.
"""
if isinstance(obj, Decimal):
return float(obj)
elif isinstance(obj, list):
return [normalize_json_numbers(x) for x in obj]
elif isinstance(obj, dict):
return {k: normalize_json_numbers(v) for k, v in obj.items()}
else:
return obj def stream_annotations(json_path, cursor, conn):
batch = []
with open(json_path, "rb") as f:
for ann in tqdm(ijson.items(f, "annotations.item"), desc="Annotations"):
bbox = normalize_json_numbers(ann.get("bbox", []))
segmentation = normalize_json_numbers(ann.get("segmentation", []))
area = ann.get("area", 0.0)
iscrowd = ann.get("iscrowd", 0)
batch.append(
(
int(ann["id"]),
int(ann["image_id"]),
int(ann["category_id"]),
json.dumps(bbox),
float(area),
int(iscrowd),
json.dumps(segmentation),
)
)
if len(batch) >= BATCH_SIZE:
insert_batch(
cursor,
"""
INSERT INTO annotations
(id, image_id, category_id, bbox, area, iscrowd, segmentation)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
batch,
)
conn.commit()
batch.clear() 이를 통해:
- 대량 INSERT 성능 개선
- SQLite 바인딩 오류 방지
- JSON 직렬화 오류 방지
와 같은 효과를 얻을 수 있었다.
데이터 적재 후 인덱스 생성
annotation 테이블은 이후 다음과 같은 쿼리에 반복적으로 사용될 예정이었다.
- 특정 image_id에 대한 annotation 조회
- 특정 category_id에 대한 필터링
- 서브셋 폴더 내 이미지 목록과 JOIN
이에 따라 데이터 적재 완료 이후, 다음 컬럼에 대해 인덱스를 생성하였다.
- annotations.image_id
- annotations.category_id
대량 INSERT시 인덱스 유지 비용으로 인해 적재 성능이 저하되는 것을 방지하기 위하여 모든 데이터의 적재가 완료된 후 인덱스를 생성하도록 구성하였다.
def create_indexes(cursor, conn):
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_annotations_image_id
ON annotations(image_id)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_annotations_category_id
ON annotations(category_id)
""")
conn.commit() 결과
SQLite DB로 변환한 후:
- 특정 서브셋에 해당하는 이미지들의 annotation 추출
- 특정 클래스만 포함하는 이미지 필터링
과 같은 작업을 수 초 내에 처리할 수 있게 되었다.