Legacy - 부트캠프/[부트캠프] 회고

[데이터 엔지니어링 부트캠프]10월 1주차 회고

포리셔 2023. 10. 8. 14:05

좋았던 점

  • 연휴에 푹 쉬고 와서 그런지 도커 실행에 있어 트러블슈팅은 비교적 안정권에 들어온 모양입니다. 물론 후술할 아쉬웠던 점 때문에 도커가 통으로 날아가버린 수강생도 있어서 이 점은 좀 두고봐야 할 것 같습니다.
  • 머신러닝/딥러닝 파트의 스터디를 소집했습니다. 수요일 날 최초로 소집해서 간단한 OT를 한 뒤 자료를 배포했는데 일단 아직까지는 반응이 괜찮은 거 같습니다. 계속 만들어야 하는데 잘 만들 수 있겠죠...??
  • 이번에 소개받은 엘라스틱서치(ElasticSearch) 부분은 CLI 환경과 코딩에 지친 수강생들에게는 꽤 반가울 수 있을 것 같습니다. 가동만 되면 웹 브라우저 상에서 GUI 환경으로 데이터 시각화나 해석이 가능하기 때문이죠. 실무에서도 쓰인다고 하니 사용하기까지의 진입장벽만 넘어서면 꽤 유용하게 써먹어 볼 수 있을 것 같습니다. 다만...
    (아쉬웠던 점에서 이어집니다.)

아쉬웠던 점

  • 도커로 환경을 워낙 이것저것 만들어서 구축하다 보니 노트북의 사양이 슬슬 감당하기 어려울 지경입니다. 작업 관리자를 켜보니 16GB 램으로는 빅데이터 용 툴(특히 스쿱(sqoop))들이 메모리 부족으로 인해 컴퓨터가 랙이 걸리고, SSD는 256GB 용량이 도커 환경 구축만으로 꽉 찰 지경입니다. 이 글 쓰는 현재 시점 기준으로 제 노트북은 6GB 정도 남았는데, 센터에서 제공해준 거라 추가 용량 증설은 조금 어려울 거 같고... 난감하군요. 어떻게 용량을 확보할 수 있는지 문의를 해봐야겠습니다.
  • 제공받은 csv 파일의 컬럼과 실습하는 데 사용하는 bash 명령어 라인의 컬럼 명이 일치하지 않습니다. 일견 사소하다고 볼 수도 있겠지만, 실무에서 충분히 문제삼을 수 있는 부분이고 그동안의 업보가 있어서 다시 신경이 날카로워지곤 했습니다.

배운 점

에어플로우 도커 설정

지난 주에 이어서 바이낸스 API를 통해 가상화폐 가격을 조회한 후 Prophet 라이브러리를 통해 미래의 가격 예측을 하는 과정을 에어플로우를 통해 파이프라인 구축하는 실습을 진행했습니다.

  • 우분투를 관리자 권한으로 실행한 뒤 root 계정에 접속합니다. 이후, VS Code를 관리자 권한으로 실행한 뒤 SSH에 접속해서 root 디렉터리에 도커파일을 작성합니다.
# apache/airflow:2.7.1 버전 다운로드
FROM apache/airflow:2.7.1
# root 계정으로 실행
USER root
# 리눅스 업데이트
RUN apt-get update
# 필요한 라이브러리 설치
RUN apt-get install libc-dev -y
RUN apt install git -y
RUN apt-get install build-essential -y
RUN apt-get install python-dev -y

# airflow 계정으로 실행
USER airflow
# pip 업데이트
RUN pip3 install pip --upgrade
# MySQL 라이브러리 설치
RUN pip3 install PyMySQL
# prophet 관련 라이브러리 설치
RUN pip3 install scikit-learn
RUN pip3 install plotly
RUN pip3 install pystan==2.19.1.1
RUN pip3 install prophet
# 라이브러리 설치 테스트
RUN python3 -c "from prophet import Prophet;print(Prophet);"
  • 우분투 터미널에 docker login을 입력한 후 docker hub에 가입한 아이디와 비밀번호를 입력한 뒤, 만약을 대비해 현재 실행 중인 컨테이너를 모두 종료합니다. 여기서는 docker stop $(docker ps -ap) 명령어를 실행하면 됩니다.
  • 도커 이미지를 생성합니다. (docker build -t airflow_custom . -f airflow-dockerfile)
  • 도커컴포즈 파일 수정: 빌드가 성공적으로 끝났다면 지난 주에 작성한 도커컴포즈 파일을 수정합니다. docker-compose.yaml 파일을 엽니다. image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.7.1} 행을 image: airflow_custom으로 바꿔주세요. 우리가 빌드한 airflow_custom 이미지를 사용하기 위합입니다!
  • 에어플로우 실행 파일 수정: airflow_install.sh 파일을 수정합니다. curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.7.1/docker-compose.yaml'을 주석 처리합니다. 이 또한 앞에서 수정한 도커컴포즈 파일을 사용하기 위함입니다. 여기까지 마쳤다면 터미널에서 root 디렉터리에 있는지 확인한 후 ./airflow_install.sh를 입력해 에어플로우를 실행합니다.데이터 수집 및 저장 구현
  • MySQL 외부 접속 허용: 에어플로우 도커 파일은 가상 이미지이기 때문에 MySQL이 설치되어 있는 윈도우와 다른 컴퓨터로 인식됩니다. 설상가상으로 MySQL은 기본적으로 다른 컴퓨터에서 접속하지 못하게 되어있기 때문에 타 컴퓨터에서도 접속이 가능하도록 설정을 만져야 합니다. MySQL 워크벤치를 켜고 새 탭을 연 뒤 아래 쿼리를 실행합니다. 쿼리를 실행하면 root 계정이 localhost로 설정되어 다른 컴퓨터에서 접속할 수 없음을 알립니다.
  • 다른 컴퓨터에서 접속이 가능하도록 하려면 아예 새로운 계정을 파야 합니다. 아래 쿼리를 이용해 계정 생성을 합니다. 생성이 끝나면 아래와 같이 host가 %인 root 계정이 생성됩니다. 다른 컴퓨터에서 접속 가능하다는 뜻입니다.
-- 원격 접속 가능 계정 생성
CREATE USER 'root'@'%' IDENTIFIED BY '1234';
-- 원격 접속 권한 설정
GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' WITH GRANT OPTION;
-- 추가한 내용 저장
FLUSH PRIVILEGES;
  • 컴퓨터 아이피 확인: 명령 프롬프트를 관리자 권한으로 실행한 후 ipconfig를 입력해 아이피 주소를 조회합니다.
  • 에어플로우 DAG 구현: 다시 VS Code로 돌아갑시다. /root/dags 디렉터리를 열고 get_binance02.py를 작성합니다. 구조는 지난 주에 작성한 get_binance01.py와 유사하지만, 시작 시간이 오늘 날짜로 바뀌었고, 데이터베이스 연결을 위해 pymysql.connect 클래스에 방금 확보한 아이피 주소와 MySQL 비밀번호를 집어넣어 객체를 생성하는 차이점이 있습니다. 또한 이미 확보한 DB 데이터는 업데이트만 하도록 쿼리를 추가로 구성했습니다.DAG 실행여기까지 진행한 후 DAG 관리 창에서 get_binance02 DAG를 실행합니다. 로그 탭에서 DAG의 명령어들이 정상적으로 실행된 것을 확인하고, MySQL 워크벤치에서도 데이터가 제대로 저장되었는지 확인할 수 있었습니다.
    get_binance02
    MySQL 워크벤치 조회
    같은 방식으로 DAG 파이썬 스크립트에 머신러닝 태스크도 추가해 예측값을 SQL 컬럼에 추가하는 작업도 진행했습니다. 최종적으로 사용된 get_finance02.py의 스크립트는 아래와 같습니다.
from airflow import DAG
import pendulum
import datetime
from airflow.operators.python import PythonOperator
import random

with DAG(
    dag_id="get_binance02",
    schedule_interval="@hourly",
    start_date=pendulum.datetime(2023, 10, 4, tz="Asia/Seoul"),
    catchup=False
) as dag:
    def get_bitcoin():
        import requests
        from datetime import datetime, timezone, timedelta
        import time
        import pytz
        import pymysql

        # 현재 날짜와 시간을 가져옴 (세계 표준시 UTC)
        now = datetime.now()
        print("now =", str(now))

        # 현재 날짜와 시간을 분까지 리턴
        end_date_time = str(now)[:16]
        print("end_date_time =", end_date_time)

        # 현재 시간으로부터 1시간 전 리턴
        one_hour_ago = (now - timedelta(hours=1))
        print("one_hour_ago =", one_hour_ago)

        # 현재 시간 1시간 전의 16번째 문자열까지 리턴
        start_date_time = str(one_hour_ago)[:16]
        print("start_date_time =", start_date_time)

        # 가상화폐 정보를 가져올 바이넌스 API 주소
        url = "https://api.binance.com/api/v3/klines"
        # 가상화폐 종류 BTCUSDT(비트코인), ETHUSDT(이더리움) 등
        coin_name = "BTCUSDT"

        # 문자열 형태의 end_date_time을 date_time으로 변환 (변환 결과는 1초 단위)
        end = int(time.mktime(datetime.strptime(end_date_time, '%Y-%m-%d %H:%M').timetuple())) * 1000
        print("end =", end)

        start = int(time.mktime(datetime.strptime(start_date_time, '%Y-%m-%d %H:%M').timetuple())) * 1000
        print("start =", start)

        params = {
            'symbol':coin_name,
            'interval': '1m',
            'limit':1000,
            'startTime':start,
            'endTime':end
        }

        db = pymysql.connect(
            host='yourIpAddress',
            port=3306,
            user='root',
            passwd='yourPassword',
            db='coin_db',
            charset='utf8'
        )

        cursor = db.cursor()

        while start < end:
            print("start =", start // 1000)

            params['startTime'] = start
            result = requests.get(url, params=params)
            coin_list = result.json()

            if not coin_list:
                break

            for coin in coin_list:
                # 세계 표준시(UTC)를 한국 표준시(KST)로 변환할 객체
                timezone = pytz.timezone('Asia/Seoul')

                open_time=datetime.fromtimestamp(coin[0] // 1000, tz=timezone)
                print("open_time =", open_time)
                open_price = coin[1]
                print("open_price =", open_price)
                high_price = coin[2]
                print("high_price =", high_price)
                low_price = coin[3]
                print("low_price =", low_price)
                close_price = coin[4]
                print("close_price =", close_price)
                volume = coin[5]
                print("volume =", volume)
                print("=" * 100)

                count_sql = "select count(*) from coin_tbl where open_time=%s"
                cursor.execute(count_sql, (open_time))

                count = cursor.fetchall()[0][0]
                print("count =", count)

                if count < 1:
                    insert_sql = "insert into coin_tbl (open_time, open_price, high_price, low_price, close_price, volume, symbol) "
                    insert_sql += " values (%s, %s, %s, %s, %s, %s, %s);"

                    cursor.execute(insert_sql, (open_time, open_price, high_price, low_price, close_price, volume, coin_name))
                    db.commit()
                else:
                    update_sql = "update coin_tbl set open_price=%s, high_price=%s, low_price=%s, close_price=%s, volume=%s, symbol=%s where open_time=%s;"

                    cursor.execute(update_sql, (open_price, high_price, low_price, close_price, volume, coin_name, open_time))
                    db.commit()
            # coin_list[-1][0]: 코인 리스트 마지막 행 0번째 열 → 수집한 마시막 시간이 int로 저장되어 있음
            # 단위는 천 분의 1초(0.001초)
            # 60000 → 60초 후 시간을 시작 시간으로 데이터 수집    
            start = coin_list[-1][0] + 60000
            time.sleep(1)

        db.close()

    py_t1 = PythonOperator(
        task_id = 'py_t1004_01', # 실행할 task id
        python_callable=get_bitcoin # 실행할 함수
    )

    # 가상화폐 가격을 예측해서 예측값을 DB에 저장
    def get_predict():
        import pymysql
        from prophet import Prophet
        import pandas as pd

        db = pymysql.connect(
            host='192.168.0.109',
            port=3306,
            user='root',
            passwd='1234',
            db='coin_db',
            charset='utf8'
        )

        cursor = db.cursor()

        sql = "select t1.* from (select open_time as ds, close_price as y from coin_tbl order by open_time desc limit 14400) as t1 order by t1.ds asc;"
        bitcoin_df = pd.read_sql(sql, db)

        prophet = Prophet(
            seasonality_mode='multiplicative', # 트렌드 반영
            yearly_seasonality=True, # 연간 트렌드 반영
            weekly_seasonality=True, # 주간 트렌드 반영
            daily_seasonality=True, # 일간 트렌드 반영
            changepoint_prior_scale=0.5 # 트렌드 반영 비율
        )

        prophet.fit(bitcoin_df)

        # 1분씩 데이터 60개 예측하도록 설정 (실제 예측으로 마지막 저장된 시간 이후 1시간 이후 비트코인 가격 예측)
        future_data = prophet.make_future_dataframe(periods=60, freq='min')

        # 가격 예측
        forecast_data = prophet.predict(future_data)
        # future_data의 행의 수 리턴
        df_count = len(future_data)

        for index in range(df_count):
            # 실제 비트코인 가격 정보가 저장된 bitcoin_df의 index 행 ds 열의 데이터 리턴 (비트코인 날짜와 시간)
            open_time = future_data.loc[index, "ds"]
            print("open_time =", open_time)

            # 예측값이 저장된 forecast_data의 index 행 ds 열의 데이터 조회 (비트코인 예측값)
            predic_price = forecast_data.loc[index, "yhat"]
            print("predic_price =", predic_price)
            # open_time과 일치하는 시간의 레코드의 개수 조회 쿼리 작성
            count_sql = "select count(*) from coin_tbl where open_time=%s ;"
            # open_time과 일치하는 시간의 레코드의 개수 조회 쿼리 실행하도록 설정
            cursor.execute(count_sql, (open_time))
            # open_time과 일치하는 시간의 레코드의 개수 조회 쿼리 실행
            count = cursor.fetchall()[0][0]
            print("count =", count)

            if count >= 1: # count => open_time(비트코인 날짜와 시간)이 일치하는 행의 개수가 1개 이상(즉, 존재함)
                # open_time과 일치하는 시간 predic_price is null: 예측값이 null인 레코드의 개수 조회 쿼리 작성
                predic_count_sql = "select count(*) from coin_tbl where open_time=%s and predic_price is null;"
                cursor.execute(predic_count_sql, (open_time))
                predic_count = cursor.fetchall()[0][0]
                print("predic_count =", predic_count)

                if predic_count >= 1: # open_time과 일치하는 시간 predic_price is null: 예측값이 null인 레코드의 개수가 1개 이상(즉, 존재함)
                    # coin_tbl 테이블에 예측값을 update할 SQL 쿼리 생성 (기존에 null 값임)
                    update_sql = "update coin_tbl set predic_price=%s where open_time=%s;"
                    cursor.execute(update_sql, (predic_price, open_time))
                    db.commit()
            else: # open_time과 일치하는 시간에 레코드 개수가 1개 미만 (즉, 레코드 없음)
                # coin_tbl 테이블에 예측값을 insert할 SQL 쿼리 생성 (기존에 null 값임)
                insert_sql = "insert into coin_tbl (open_time, predic_price) values (%s, %s);"
                cursor.execute(insert_sql, (open_time, predic_price))
                db.commit()

            print("=" * 100)

        db.close()

    py_t2 = PythonOperator(
        task_id='py_t1004_02', # 실행할 task id
        python_callable=get_predict # 실행할 함수
    )

    py_t1 >> py_t2

아파치 스쿱(Apache Sqoop)

RDB와 하둡 사이에서 데이터 출입/이동을 지원하는 툴인 스쿱을 사용했습니다. 과거에 사용했던 주가 테이블 데이터를 이용했습니다.

  • 데이터의 출입을 하기에 앞서 먼저 아래 명령어를 터미널에 입력해 한글 설정을 했습니다.
  • localedef -f UTF-8 -i ko_KR ko_KR.UTF-8 export LC_ALL=ko_KR.UTF-8 LC_ALL=ko_KR.UTF-8 bash
  • 데이터 임포트(import) 및 익스포트(export): MySQL에서 하이브로 데이터를 임포트해오는 명령어와, 반대로 하이브에서 MySQL로 데이터를 익스포트하는 명령어입니다. 참고로 데이터 익스포트 시에는 MySQL에 미리 테이블을 생성하고, 그 생성한 테이블과 하이브에 저장된 테이블의 컬럼명과 자료형이 일치해야 내보낼 수 있습니다.
sqoop import \
--connect 'jdbc:mysql://{yourIpAddress}:3306/coin_db?useUnicode=true&serverTimezone=Asia/Seoul' \
--username root \
--password {yourPassword} \
--fields-terminated-by "," \
--hive-import --create-hive-table --hive-table sqoop_coin_tbl -m 1 \
--query 'SELECT num,open_time,open_price,high_price,low_price,close_price,volume,symbol,predic_price FROM coin_tbl WHERE $CONDITIONS' \
--target-dir  hdfs://localhost:9000/coin_data
sqoop export \
--connect 'jdbc:mysql://{yourIpAddress}:3306/coin_db?useUnicode=true&serverTimezone=Asia/Seoul' \
--username root \
--password {yourPassword} \
--table mysql_stock \
--export-dir hdfs://localhost:9000/my_stock/

아파치 피그(Apache Pig)

빅데이터를 빠르게 분석하기 위해 만들어진 툴인 피그를 사용했습니다. 일반적으로 피그 단독으로 사용하는 경우가 잘 없기 때문에, 기본적인 피그 문법을 매운 맛으로 실습한 이후로는 엘라스틱 서치(ElasticSearch)와 키바나(Kibana)와 함께 사용했습니다.

  • 피그 실행 시: pig -x local로 실행합니다. 에러 없이 실행되었다면 grunt라는 이름이 출력되어야 합니다.
    • 저장된 데이터 출력: dump A;와 같이 사용합니다.
    • 반복문으로 A의 데이터를 B에 저장합니다. B = foreach A generate order_date, delivery_date, reservation, delivery_memo, customer_ip, customer_gender, customer_age, delivery_point, customer_address, shopping_mall, score, product_classfication, amount, payment;
    • 이제 데이터가 저장된 변수 B를 es_shopping이라는 이름으로 엘라스틱 서치와 데이터 공유를 진행합니다.
STORE B INTO 'es_shopping'
USING org.elasticsearch.hadoop.pig.EsStorage(
'es.nodes.wan.only=true',
'es.port=9201',
'es.nodes=localhost');

엘라스틱 서치(ElasticSearch)

앞서 피그를 이용해 공유한 데이터를 엘라스틱 서치에서 GUI 환경으로 시각화 및 분석을 진행할 수 있습니다. 엘라스틱 서치의 기본 포트 번호는 5601이므로 http://localhost:5601로 접속할 수 있습니다. RDBMS에서 사용되는 개념을 거의 동일하게 사용하지만, 일부 용어가 판이하게 다른 부분이 있습니다.

RDBMS ElasticSearch Excel
Database Index Excel File
Table Type Sheet
Row Document Row
Column Field Column
Schema Mapping 용어 없음

엘라스틱 서치는 Rest API를 기반으로 데이터를 불러오기 때문에 스프링 부트 때 실습했던 CRUD 구조를 이용해 데이터의 추가, 검색, 수정, 삭제가 가능했습니다. 또한 집계함수의 이용, 정확도 및 range 쿼리를 이용해 특정 범위 내의 값의 추출과 통계적 분석도 가능합니다.

사소한 팁 하나
툴바의 Management 탭 - Dev Tools에서 POST /_sql/translate 기능을 이용하면 SQL 쿼리를 엘라스틱 서치 쿼리로 변환할 수 있었습니다!

쿼리 변환

앞으로 바라는 점

  • 추석 연휴 잘 쉬고 왔는데... 이젠 오랜 지병인 비염이 도졌습니다. 매년 비염으로 고통받는데 올해 유독 힘듭니다. 저 말고도 같은 반에 비염으로 고통받는 영혼들이 속출하고 있으니 다들 필요하면 병원 가서 약도 타 오고 건강 관리 잘 했으면 좋겠습니다.
  • 이 와중에 다음 주에(아마도 화요일, 월요일은 한글날이 하루 쉬니까) 또 미니 프로젝트를 하나 더 한다고 합니다. 아무리 데이터 엔지니어들이 모든 데이터의 도메인을 다 알지는 못한다고 하지만, 그래도 최소한 데이터에 (결측치가 있더라도) 해석이 가능할 정도로는 마련해줘야 할 거 아닌가요? 그래서인지 벌써부터 걱정이 앞서는데...
  • 멘토님들에 따라서 팀이 쪼개질 수 있다는 소식을 얼핏 전해들었습니다. 기껏 친목 다져놨더니 쪼개져서 다른 팀으로 들어가야 된다면... 음... 경우에 따라서는 조금 골치 아픈 상황이 닥칠 수도 있겠습니다.