프로젝트를 진행하면서 Python을 통해 Hive를 컨트롤하는 니즈가 발생했다.
sqlalchemy를 활용하여, create select insert를 구현했다.
sqlalchemy 란?
Python에서 사용가능한 ORM(Object-relational maping)이다.
즉, 데이터베이스의 데이터를 Object필드와 매핑해준다는 것이다. 사실 DB의 스키마와 Object는 서로 기존부터 호환가능성을 두고 만들어진 것이 아니기에 불일치가 발생하는데,, ORM은 객체 간의 관계를 바탕으로 SQL문을 자동으로 생성하여 이러한 불일치를 해결한다.
이를 통해서 쿼리를 실행할 수도 있고, Python의 Dataframe과 데이터베이스간을 연결할 수 있다.
Hive-Python 연동
1. select
from sqlalchemy import create_engine
engine = create_engine('hive://localhost:10000')
# select문
df = pd.read_sql("select * from db.table", engine)
2. Create
from sqlalchemy import create_engine
engine = create_engine('hive://localhost:10000')
# Create
db = db_name
table = table_name
schema = '(`val1` string, ' \
'`val2` Date, ' \
'`val3` int) '
query = ('''CREATE TABLE IF NOT EXISTS %s.%s %s
PARTITIONED BY (year int, month int, day int)
STORED AS PARQUET
''') % (db, table, schema)
engine.execute(query)
# PARTITIONED BY: 파티셔닝을 통해서 데이터를 분할한다 (데이터 양을 제한하여 성능을 향상시킬수 있다)
# STORED AS PARQUET: 칼럼기반으로 데이터를 저장한다
3. Insert
from sqlalchemy import create_engine
engine = create_engine('hive://localhost:10000')
# Insert문
df.to_sql(schema=db, name=table, con=engine,
index=False, method='multi', if_exists='append')
# index: df의 index를 db의 칼럼으로 추가할지 여부
# method: multi일 경우, 다수의 행을 대상으로 한다
# if_exists: 만약 table이 존재할 경우, 뒤에 데이터를 붙인다
이를 클래스화하면 아래와 같다.
참고
https://moons08.github.io/programming/Hive_Partition/
https://moons08.github.io/programming/Hive_Table/#parquet
https://ulfrid.github.io/python/python-sqlalchemy/
반응형
'Engineering 💻 > Hadoop' 카테고리의 다른 글
[Spark] Exception while deleting Spark temp dir 에러 해결 (0) | 2022.02.18 |
---|---|
[Hadoop] Spark 동작 단계 (0) | 2022.01.18 |
[Hadoop] 오케스트레이션 (feat. Oozie, Airflow) (0) | 2022.01.13 |
[Hadooop] 분석용 SQL 엔진 (feat. Hive, Impala, Presto) (0) | 2022.01.12 |
[Hadoop] 연산 프레임워크 (feat. MapReduce, Spark, Flink) (0) | 2022.01.11 |