그래프 DB 통합 가이드 — Neo4j 모델 · ETL · 추천 시스템 아키텍처
관계 중심 문제에 그래프를 적용하는 방법을 Neo4j 예제, 관계형→그래프 ETL(파이썬 코드 포함), 실전 추천 시스템 아키텍처까지 한 페이지로 정리했습니다.
1. Neo4j 데이터 모델 & 핵심 Cypher 예제
모델: 사용자(User), 상품(Product), 카테고리(Category), 이벤트(예: PURCHASED, VIEWED, RATED).
스키마 제약(권장)
CREATE CONSTRAINT unique_user_id IF NOT EXISTS
FOR (u:User) REQUIRE u.user_id IS UNIQUE;
CREATE CONSTRAINT unique_product_id IF NOT EXISTS
FOR (p:Product) REQUIRE p.product_id IS UNIQUE;
샘플 데이터 생성
/* 예제: 사용자·상품·구매 관계 생성 */
CREATE (u1:User {user_id:'u1', name:'홍길동'})
CREATE (u2:User {user_id:'u2', name:'김영희'})
CREATE (p1:Product {product_id:'p100', title:'무선이어폰'})
CREATE (p2:Product {product_id:'p200', title:'블루투스 스피커'})
CREATE (p3:Product {product_id:'p300', title:'보조배터리'})
CREATE (u1)-[:PURCHASED {ts: datetime('2025-11-01T09:12:00')}]->(p1)
CREATE (u1)-[:VIEWED {ts: datetime('2025-11-02T10:00:00')}]->(p2)
CREATE (u2)-[:PURCHASED {ts: datetime('2025-10-30T14:20:00')}]->(p1)
CREATE (u2)-[:PURCHASED {ts: datetime('2025-11-03T11:05:00')}]->(p3);
기본 추천 쿼리 (협업 필터링, 간단한 동료 기반)
사용자와 비슷한 사용자가 구매한 상품을 추천(해당 사용자가 아직 구매하지 않은 상품 우선)
MATCH (u:User {user_id:'u1'})-[:PURCHASED]->(p:Product)
WITH u, collect(p) AS myProducts
MATCH (u)-[:PURCHASED]->(p)<-[:PURCHASED]-(other:User)-[:PURCHASED]->(rec:Product)
WHERE NOT (u)-[:PURCHASED]->(rec)
RETURN rec.product_id AS product_id, rec.title AS title, count(*) AS score
ORDER BY score DESC, title
LIMIT 10;
간단한 그래프 알고리즘 예 — 공통 이웃 기반 유사도 (Neo4j Graph Data Science 사용)
사전 설치된 GDS 라이브러리 가정. 사용자 유사도 계산 예시(간단)
/* 예: 사용자 노드로 프로젝트 생성 후 commonNeighbors 유사도 계산 */
CALL gds.graph.project(
'userProductGraph',
['User','Product'],
{ PURCHASED: { orientation: 'UNDIRECTED' } }
);
CALL gds.nodeSimilarity.stream('userProductGraph')
YIELD node1, node2, similarity
WHERE gds.util.asNode(node1).user_id = 'u1' OR gds.util.asNode(node2).user_id = 'u1'
RETURN gds.util.asNode(node1).user_id AS userA,
gds.util.asNode(node2).user_id AS userB,
similarity
ORDER BY similarity DESC LIMIT 10;
참고: 실제 서비스에서는 '구매 가중치', '최근성 가중', '카테고리 필터' 등을 결합하면 추천 품질이 크게 향상됩니다.
2. 관계형 DB → 그래프 DB(Neo4j) ETL 가이드
목표: 기존 RDB(예: PostgreSQL)의 users, products, orders 테이블을 그래프 모델(노드·관계)로 적재.
1) 매핑(데이터 모델 변환)
| RDB 테이블/컬럼 | 그래프 모델 |
|---|---|
| users(id, name, email) | (:User {user_id:id, name, email}) |
| products(id, title, category_id) | (:Product {product_id:id, title}), (:Category {category_id}) |
| orders(id, user_id, product_id, created_at) | (User)-[:PURCHASED {ts:created_at}]->(Product) |
2) ETL 전략(권장)
- 1) 스키마 제약(Unique constraints) 먼저 생성
- 2) 배치 처리: 큰 테이블은 청크(예: 1,000~50,000 row)로 나눠 적재
- 3) idempotency 확보: MERGE 사용 또는 tx별 중복 체크
- 4) 에러 로깅 및 재시도 정책
- 5) 초기 로드 후 점진적 동기화(CDC 또는 변경 로그 사용)
3) 파이썬 ETL 예시 (Postgres → Neo4j)
설명: psycopg2로 Postgres에서 데이터를 읽어 neo4j 드라이버로 쓰는 간단한 예제. 실제 환경에서는 커넥션 풀·병렬화·재시도 로직을 추가하세요.
# Python 3 예시 (간단한 흐름)
# 필요 패키지: psycopg2, neo4j
# pip install psycopg2-binary neo4j
import psycopg2
from neo4j import GraphDatabase
from datetime import datetime
# --- 설정 ---
pg_conf = {"host":"PG_HOST","port":5432,"dbname":"db","user":"pguser","password":"pgpass"}
neo4j_uri = "neo4j+s://NEO4J_HOST:7687"
neo4j_auth = ("neo4j_user","neo4j_password")
# --- Postgres 연결 및 사용자 배치 로드 ---
pg_conn = psycopg2.connect(**pg_conf)
pg_cur = pg_conn.cursor(name='user_cursor') # server-side cursor for batching
pg_cur.itersize = 1000
pg_cur.execute("SELECT id, name, email FROM users;")
driver = GraphDatabase.driver(neo4j_uri, auth=neo4j_auth, encrypted=True)
def upsert_user(tx, user_id, name, email):
tx.run("""
MERGE (u:User {user_id: $user_id})
SET u.name = $name, u.email = $email
""", user_id=str(user_id), name=name, email=email)
with driver.session() as session:
for row in pg_cur:
uid, name, email = row
session.write_transaction(upsert_user, uid, name, email)
# --- orders 적재 예 (관계 생성) ---
pg_cur2 = pg_conn.cursor()
pg_cur2.execute("SELECT user_id, product_id, created_at FROM orders ORDER BY created_at;")
batch = []
with driver.session() as session:
for user_id, product_id, created_at in pg_cur2:
session.write_transaction(
lambda tx, u,p,ts: tx.run(
"MATCH (u:User {user_id:$u}),(p:Product {product_id:$p}) "
"MERGE (u)-[:PURCHASED {ts:$ts}]->(p)",
u=str(user_id), p=str(product_id), ts=str(created_at)
), user_id, product_id, created_at
)
pg_cur.close(); pg_cur2.close(); pg_conn.close(); driver.close()
4) CDC(변경 데이터 캡처) 연동 아이디어
- Debezium + Kafka → Kafka Connect → Neo4j Kafka Connector(또는 사용자 커넥터)로 실시간 반영
- 단순한 경우: DB trigger → 변경 로그 테이블 → 주기적 ETL(빠른 반영 불가)
운영 팁: 초기 대량 적재는 배치로, 그 이후 실시간 동기화는 CDC 기반이 가장 안정적입니다. Neo4j에서는 MERGE 사용 시 성능 주의(대량 MERGE는 인덱스 활용과 청크 나눔 필수).
3. 추천 시스템 아키텍처 (그래프 기반) — 다이어그램 + 설명
아키텍처 핵심 포인트
- 데이터 수집층: 기존 RDB는 정기 배치 ETL로 초기 로드, 이후 변경은 CDC(Debezium)로 스트리밍 연동.
- 그래프 저장소: Neo4j에 사용자·상품·행동(조회·구매)을 적재. 관계 중심 쿼리에 최적화.
- 분석층: GDS(Neo4j Graph Data Science)로 유사도·페이지랭크·퍼스널라이즈 점수 산출. 무거운 배치 작업은 별도의 분석 클러스터에서 수행 후 결과(추천 후보)를 다시 Neo4j나 별도 테이블에 저장.
- 서빙층: API 서버가 Neo4j를 직접 쿼리하거나, 사전 계산된 추천(또는 후보 + 랭킹)을 Redis에 캐시해 저지연 응답 제공.
실전용 추천 흐름 (예)
- 사용자 페이지 로드 → 캐시(Redis)에 추천있으면 반환
- 캐시에 없으면: API가 Neo4j에 아래 쿼리 요청 → 결과 반환 후 캐시
- 정기 배치: GDS로 상위 후보(예: Top-100) 계산 → 개인화 랭킹 적용 → Redis/DB에 저장
실전용 저지연 Cypher (캐시 미존재 시 대체 쿼리)
/* 1) 동일 제품을 구매한 다른 사용자가 구매한 상품 추천 (간단, 캐시 백업용) */
MATCH (u:User {user_id:$userId})-[:PURCHASED]->(p:Product)
MATCH (p)<-[:PURCHASED]-(other:User)-[:PURCHASED]->(rec:Product)
WHERE NOT (u)-[:PURCHASED]->(rec)
RETURN rec.product_id AS id, rec.title AS title, count(*) AS score
ORDER BY score DESC LIMIT 20;
/* 2) 카테고리·최근성 가중치 추가(간단 가중치 적용 예) */
MATCH (u:User {user_id:$userId})-[r:PURCHASED]->(p:Product)
WITH u, collect(p) AS myProducts
MATCH (u)-[:PURCHASED]->()<-[:PURCHASED]-(other:User)-[r2:PURCHASED]->(rec:Product)
WHERE NOT (u)-[:PURCHASED]->(rec)
WITH rec, count(*) AS cfScore, sum(CASE WHEN r2.ts > datetime().epochMillis - 30*24*3600*1000 THEN 2 ELSE 1 END) AS recentBoost
RETURN rec.product_id AS id, rec.title AS title, (cfScore * 0.7 + recentBoost * 0.3) AS score
ORDER BY score DESC LIMIT 20;
운영 팁: Neo4j에 직접적인 복잡·무거운 쿼리를 자주 호출하면 비용이 커집니다. 가능한 후보 생성은 배치·GDS에서 처리하고, 실시간 랭킹·필터만 API 레이어에서 수행하세요.
댓글 없음:
댓글 쓰기