Airflow 실습03

Elastic search 질의

  • 실무 예제로 배우는 데이터 공학 83p

관리자 권한으로 실행 : Ubuntu

→ 경로 이동 : …airflow/

source venv/bin/activate

code .

→ VSCord가 자동 실행된다

→파일 생성 : e_query.py

→ 코드 작성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import pandas as pd
from pandas.io.json import json_normalize
from elasticsearch import Elasticsearch

# Elasticsearch 객체 생성
es = Elasticsearch()

# 일래스틱서치에 보낼 문서 본문(질의 요청) JSON 객체를 만든다.
# Matchall 검색 사용
doc = {"query" : {"match_all": {}}}
res = es.search(index="users", body = doc, size = 500)
# print(res['hits']['hits'])

# 루프로 문서를 훑으면서 각 문서의 _source 필드만 출력한다.
# for doc in res['hits']['hits']:
# print(doc['_source'])

# 질의 결과를 pandas DataFrame에 넣는 것도 가능
df = json_normalize(res['hits']['hits'])
print(df.head())
print(df.info())

print(df['_source.city'].value_counts())

postgreSQL → Elastic search 데이터 전송

  • 교재 88p
  • Elastic search 가동된 상태에서 진행

→ 선행 학습 링크 참고 : postgreSQL 실습 (notion.so)

  • pgAdmin 준비된 상태에서 진행

→ 다음과 같이 출력되는 상태여야 한다.

Untitled

  • VSCord 에서 작업
  • 교재 88p

dags 폴더 아래에 파일 생성

→ 파일 생성 : airflodb.py

→ 코드 작성

import datetime as dt

from datetime import timedelta

from airflow import DAG

from airflow.operators.bash import BashOperator

from airflow.operators.python import PythonOperator

import pandas as pd

import psycopg2 as db

from elasticsearch import Elasticsearch

print("Hello")

→ 경로 이동 : (venv) kmk3593@DESKTOP-LNQ780K:/mnt/c/airflow-test/dags$

→ 실행

→ Hello 가 출력되었으므로 성공.

Untitled

  • 코드 추가 작성
  • 다음 내용을 airflodb.py에 작성한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import datetime as dt
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

import pandas as pd
import psycopg2 as db
from elasticsearch import Elasticsearch

# queryPostgresql 지정
def queryPostgresql():
conn_string="dbname='dataengineering' host='localhost' user='postgres' password='postgres'"
conn=db.connect(conn_string)
print("DB connecting....", conn)

# 데이터 추출
df = pd.read_sql("select name, city from users", conn)
df.to_csv("postgresqldata.csv")
print("----Data Saved----")

# insertElasticSearch
def insertDataElasticsearch():

# Elastic 인스턴스 생성
es = Elasticsearch()

# 데이터 불러오기
df = pd.read_csv("postgresqldata.csv")
for i, r in df.iterrows():
doc = r.to_json()
res = es.index(
index="frompostgresql"
, doc_type="doc", body=doc
)
print(res)

# DAG를 위한 인수들을 지정
default_args = {
'owner' : 'human',
'start_date' : dt.datetime(2022, 4, 18),
'retries' : 1,
'retry_delay': dt.timedelta(minutes = 5)
}

with DAG('MyDBdag',
default_args = default_args,
schedule_interval = timedelta(minutes=5), # '0 * * * * ',
) as dag:

getData = PythonOperator(
task_id = "QueryPostgreSQL"
, python_callable=queryPostgresql
)

insertData = PythonOperator(
task_id = "InsertDataElasticsearch"
, python_callable = insertDataElasticsearch
)

getData >> insertData
  • Airflow 가동

→ 저장 후 실행

python3 airflodb.py

→ airflow 실행

airflow db init

→(재시도할 경우, 실행 : airflow db reset )

→ Terminal 2개 준비하고 다음 명령 실행

airflow webserver -p 8080

airflow scheduler

→ 다음 주소로 진입

http://localhost:8080/

→ Dags

→ 활성화 : MyDBdag

→ 더블 클릭 : MyDBdag

Untitled

→ Tree

→ Update

→ 다음과 같이 출력되면 성공

Untitled

  • Reference : 실무 예제로 배우는 데이터 공학

Airflow 실습02

데이터베이스를 위한 아파치 에어플로 데이터 파이프라인 구축

실무 예제로 배우는 데이터 공학 87 ~ 91p

관리자 권한으로 실행 : ubuntu

→ elasticsearch 가동하기

Untitled

→ Kibana 가동하기

Untitled

  • 경로 이동, 가상 환경 진입

cd ..cd ..cd mnt/c/airflow-test

source venv/bin/activate

  • 교재의 elastic search 버전을 참고하여 설치

pip3 install elasticsearch==7.17.2

Untitled

  • 교재 80p
  • 일단 vi 대신에 code . 를 사용한다.

code .

( 안 될 경우, Ubuntu를 다시 시작한다)

→ 코드 실행 시, VSCord가 자동으로 시작된다

Untitled

  • 폴더 생성

→ 폴더 생성 : chapter04

→ 파일 생성 : e_search.py

→ 코드 작성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from faker import Faker

fake=Faker()
es = Elasticsearch() #or pi {127.0.0.1}

doc={"name": fake.name(),"street": fake.street_address(), "city": fake.city(),"zip":fake.zipcode()}

res=es.index(index="users",doc_type="doc",body=doc)
print(res)

doc={"query":{"match":{"_id":"pDYlOHEBxMEH3Xr-2QPk"}}}
res=es.search(index="users",body=doc,size=10)
print(res)
  • 가상환경 가동 후 실행

→ 저장

→ 터미널

source venv/bin/activate

cd chapter04/

python3 e_search.py

  • 교재 81p

→ 파일 작성 : e_search02.py

→ 코드 작성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from faker import Faker

fake=Faker()
es = Elasticsearch() #or pi {127.0.0.1}

actions = [
{
"_index": "users",
"_type": "doc",
"_source": {
"name": fake.name(),
"street": fake.street_address(),
"city": fake.city(),
"zip":fake.zipcode()}
}
for x in range(998) # or for i,r in df.iterrows()
]

response = helpers.bulk(es, actions)
print(response)

→ 저장 후 실행

python3 e_search02.py

→ 다음과 같이 (998,[]) 출력되면 성공

Untitled

  • Kibana 페이지 실행

→ 주소창에 입력 : localhost:5601/

→ 메뉴바

→ Stack Management

Untitled

→ Index Patterns

Untitled

→ Create index pattern

→ 이름 : users

Untitled

→ Create index pattern

Untitled

→ 햄버거 메뉴바 열기

→ Discover

Untitled

→ 앞에서 추가한 index의 문서를 확인할 수 있다.

Untitled

데이터 저장소

  • RDBMS

— 종류 : Oracle, PostgreSQL, MySQL, 빅쿼리(구글),…

— 표준 SQL (하나를 잘 알면, 거의 비슷!)

  • NoSQL

— 종류 : Elasticsearch, 몽고 DB (무료 버전)

어려운 것 조회 하는 방법이 RDMBS ≠ NoSQL 다름 (완전 다름!)

Airflow 재설치 및 데이터 파이프라인 구축

Setting up Apache-Airflow in Windows using WSL2 - Data Science | DSChloe

Airflow 데이터 파이프라인 구축 예제 - Data Science | DSChloe

  • 체크포인트
  1. 가상환경을 만들 수 있는냐 (virtualenv 라이브러리)

  2. 경로 이동이 자유로운가? cd 사용

  3. 환경 변수를 이해하고 잡을 수 있는가?

    vi 편집기를 자유자재로 쓸 수 있는가?

  4. 파이썬 라이브러리를 가상환경에 자유자재로 설치 할 수 있는가?

  5. 가상 환경에 자유롭게 출입할 수 있는가?

  • 사전 준비

VSCord 의 airflow.cfg 에서 진행

→ 내용 변경 : load_examples = TrueFalse

→ c드라이브 → airflow_test → dags와 chapter03, chapter04 를 배경화면에 빼둔다.

Untitled

  • airflow-test 내용물 삭제

Ubuntu 의 airflow 경로에서 진행

deactivate

sudo rm -rf *

  • 다시 가상환경 생성

virtualenv venv

ls

  • 필요한 내용이 작성되어 있는지 확인

vi ~/.bashrc

→ 내용 확인 : export AIRFLOW_HOME=/mnt/c/airflow-test

source ~/.bashrc

echo $AIRFLOW_HOME

pwd

  • 가상환경 on

source venv/bin/activate

  • 라이브러리 설치

pip3 install 'apache-airflow[postgres, slack, celery]'

  • db 설정

airflow db init

  • 계정 등록
  • firstname이 실행 결과에 영향을 주는가

airflow users create --username airflow --password airflow --firstname evan --lastname airflow --role Admin --email your_email@some.com

  • VSCord 에서 진행

→ 폴더 생성 : ( file → open folder → airflow-test )

→ airflow.cfg 파일

→ 내용 변경 : load_examples=TrueFalse

Untitled

  • Ubuntu 에서 진행

airflow db reset

→ 가상 환경 상태에서 다음 코드 실행

airflow webserver -p 8080

Untitled

→ ctrl + c 로 빠져나온다.

데이터 파이프라인 구축

개요

  • 이번에는 CSV-JSON으로 데이터를 변환하는 파이프라인을 구축하도록 한다.

Step 01. Dags 폴더 생성

  • 프로젝트 Root 하단에 Dags 폴더를 만든다.

    • dags 폴더를 확인한다.
  • dags 파일 생성

mkdir dags

ls

Step 02. 가상의 데이터 생성

  • 라이브러리 설치

→ 가상 환경에서 진행

pip3 install faker pandas

  • 폴더, 파일 생성

mkdir data

cd data

vi step01_writecsv.py

→ 코드 작성.

+ 앞으로는 이런 방식으로 코드를 작성한다.

+실무에서 필요한 습관이다.
  • faker 라이브러리를 활용하여 가상의 데이터를 생성한다. (파일 경로 : data/step01_writecsv.py)

Untitled

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from faker import Faker
import csv
output=open('data.csv','w')
fake=Faker()
header=['name','age','street','city','state','zip','lng','lat']
mywriter=csv.writer(output)
mywriter.writerow(header)
for r in range(1000):
mywriter.writerow([fake.name(),
fake.random_int(min=18, max=80, step=1),
fake.street_address(),
fake.city(),
fake.state(),
fake.zipcode(),
fake.longitude(),
fake.latitude()])
output.close()
  • 코드를 실행한다.
  • VSCord에서 data.csv 파일이 생성되어야 한다.

python3 step01_writecsv.py

ls

cat data.csv

Step 03. csv2json 파일 구축

  • 이번에는 CSV와 JSON 변환 파일을 구축하는 코드를 작성한다. (파일 경로 : dags/csv2json.py)\
  • 주요 목적 함수 csvToJson()의 역할은 data/data.csv 파일을 불러와서 fromAirflow.json 파일로 변경하는 것이다.
  • DAG는 csvToJson 함수를 하나의 작업으로 등록하는 과정을 담는다. 작업의 소유자, 시작일시, 실패 시 재시도 횟수, 재시도 지연시 시간을 지정한다.
  • print_starting >> csvJson 에서 >> 는 하류 설정 연산자라고 부른다. (동의어 비트 자리이동 연산자)

cd ..

cd dags

vi csv2json.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import datetime as dt
from datetime import timedelta

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

import pandas as pd

def csvToJson():
df=pd.read_csv('data/data.csv')
for i,r in df.iterrows():
print(r['name'])
df.to_json('fromAirflow.json',orient='records')

default_args = {
'owner': 'human',
'start_date': dt.datetime(2020, 3, 18),
'retries': 1,
'retry_delay': dt.timedelta(minutes=5),
}

with DAG('MyCSVDAG',
default_args=default_args,
schedule_interval=timedelta(minutes=5), # '0 * * * *',
) as dag:

print_starting = BashOperator(task_id='starting',
bash_command='echo "I am reading the CSV now....."')

csvJson = PythonOperator(task_id='convertCSVtoJson',
python_callable=csvToJson)

print_starting >> csvJson
  • 코드를 실행한다.
  • VSCord에서 json 파일이 생성되어야 한다.

python3 csv2json.py

Step 04. Airflow Webserver 및 Scheduler 동시 실행

  • 이제 웹서버와 스케쥴러를 동시에 실행한다. (터미널을 2개 열어야 함에 주의한다.)

VSCord 에서 WSL 터미널을 2개 띄운다.

airflow webserver -p 8080

airflow scheduler

Untitled

  • error 발생할 경우 대처

airflow.cfg의 endproint_url = 8080 체크

airflow db reset

airflow webserver -p 8080

airflow scheduler

→ 이 과정을 반복

→ 그래도 안 되면 airflow 지우고 다시 시작

이제 WebUI를 확인하면 정상적으로 작동하는 것을 확인할 수 있다

Step 05. 작업 결과물 확인

  • 최초 목적인 fromAirflow.json 로 정상적으로 변환되었는지 확인하도록 한다.

    • fromAirflow.json 파일이 확인된다면, 정상적으로 작업이 끝난 것이다.

    ls

    → 다음 내용이 출력되면 성공

    airflow-webserver.pid airflow.cfg airflow.db dags data fromAirflow.json logs venv webserver_config.py

    human@DESKTOP-V24TVMS:/mnt/c/airflow$ export AIRFLOW_HOME="$(pwd)"

    human@DESKTOP-V24TVMS:/mnt/c/airflow$ echo $AIRFLOW_HOME

  • Reference : 실무 예제로 배우는 데이터 공학

Apache Nifi Install

Apache airflow

Apache-Airflow in windows 설치

• Windows WSL2에서 airflow를 설치한다.

Step 1. Install pip on WSL

• c드라이브에 들어간다.

관리자 권한으로 실행 : Ubuntu

cd..

cd..

cd mnt/c

• 폴더를 만든다

mkdir airflow-test

ls

cd airflow-test/

• pip를 설치한다.

sudo apt-get update && sudo apt-get upgrade

sudo apt install python3-pip

Step 2. Install virtualenv package

• virtualenv 라이브러리를 설치한다.

sudo pip3 install virtualenv

Step 3. Create a virtual environment

• 이제 가상환경을 생성한다.

virtualenv venv

• 가상환경에 접속을 한다.

→ airflowtest 경로에서 해야 한다.

source venv/bin/activate

→ 경로 확인

pwd

• 이번에는 .bashrc 파일을 수정한다.

• 파일을 열고, 맨 밑줄에 다음과 같은 코드를 추가한다.

vi ~/.bashrc

→ 내용 추가 : export AIRFLOW_HOME=/mnt/c/airflow-test

→ ESC → :wq 하여 저장

• 수정된 코드를 업데이트 하기 위해서는 아래와 같이 반영한다.

source ~/.bashrc

• 실제로 코드가 반영되었는지 확인하기 위해서는 다음과 같이 확인해본다.

• 다시 가상환경에 접속하고 수행

source venv/bin/activate

echo $AIRFLOW_HOME

→ /mnt/c/airflow-test 출력되면 성공.

Step 4. Apache Airflow 설치

• PostgreSQL, Slack, Celery 패키지를 동시에 설치하는 코드를 작성한다.

sudo apt-get update && sudo apt-get upgrade

pip3 install 'apache-airflow[postgres, slack, celery]'

• airflow 실행을 위해 DB 초기화를 해줘야 한다.

airflow db init

• 실제로 잘 구현이 되었는지 확인하기 위해 webserver를 실행한다

airflow webserver -p 8081

→ port 번호 8081을 뜻한다.

• 그리고, 해당 링크 http://localhost:8081/login/ 에 접속하면 아래와 같은 화면이 나타난다.

Untitled

• 그런데, 여기에서 문제는 username을 생성하지 않았다. 따라서, username을 추가하도록 한다

→ ctrl + c 로 빠져나온다.

airflow users create --username airflow --password airflow --firstname evan --lastname airflow --role Admin --email your_email@some.com

• 다시 웹서버 실행

airflow webserver -p 8081

• 해당 링크 http://localhost:8081/login/ 에 접속

• 다음 정보로 로그인한다

→ id : airflow

password : airflow

→ 다음 화면이 나오면 성공.

Untitled

설정완료 후에 가상환경 키는 법

ubuntu

→ cd .. → cd .. → cd mnt/c → cd airflow-test

source venv/bin/activate

Airflow 재설치 및 데이터 파이프라인 구축