pyspark 실습02

사전준비

  • git bash로 VSCord에 들어가 터미널을 연다.

바탕화면 우클릭 : git bash here

cd pyspk_project

code .

→ git bash 터미널

pyspark 실습(1)

  • 가상환경 진입하고 파일 생성

source venv/Scripts/activate

→ chapter01_get_starged 폴더에서 파일 생성

→ 파일 생성 : step04_structype.py

키워드 : Struct Type

구글링 : Spark Struct Type, Spark Struct

참고 링크 :

https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.types.StructType.html

→ 코드 작성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from struct import Struct
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, IntegerType, LongType

# 세션 할당 (필수)
# spark = SparkSession.builder.appName("")
spark = SparkSession.builder.appName("PopularMovies").getOrCreate()

# 스키마 작성 (u.logs 데이터)
schema = StructType(
[
StructField("userID", IntegerType(), True),
StructField("movieID", IntegerType(), True),
StructField("rating", IntegerType(), True),
StructField("timestamp", LongType(), True)
]
)

print("Schema is done")

# 데이터 불러오기
movies_df = spark.read.option("sep", "\t").schema(schema).csv("ml-100k/u.logs")

# 내림차순으로 인기있는 영화 정렬
# movieID 그룹바이. count() orderby
toMovieIds = movies_df.groupBy("movieID").count().orderBy(func.desc('count'))

print(movies_df.show(10))

# 세션 종료
spark.stop()

→ 경로 이동 : cd chapter01_get_started

→ 저장 후 실행

python step04_structype.py

→ 다음 테이블이 출력되어야 한다.

Untitled

pyspark 실습(2)

→ chapter01_get_starged 폴더에서 파일 생성

→ 파일 생성 : step05_advancestructype.py

→ 코드 작성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, IntegerType, LongType
import codecs

print("Hello")

def loadMovieNames():
movieNames = {}
with codecs.open("ml-100k/u.ITEM", "r", encoding="ISO-8859-1", errors="ignore") as f:
for line in f:
fields = line.split("|")
movieNames[int(fields[0])] = fields[1]
return movieNames

# 세션 할당
spark = SparkSession.builder.appName("PopularMovies").getOrCreate()

# 파이썬 딕셔너리 객체를 Spark 객체로 변환
nameDict = spark.sparkContext.broadcast(loadMovieNames())

# 스키마 작성 (u.logs 데이터)
schema = StructType(
[
StructField("userID", IntegerType(), True)
, StructField("movieID", IntegerType(), True)
, StructField("rating", IntegerType(), True)
, StructField("timestamp", LongType(), True)
]
)

print("Schema is done")

# 데이터 불러오기
movies_df = spark.read.option("sep", "\t").schema(schema).csv("ml-100k/u.logs")

# 내림차순으로 인기있는 영화 정렬
# movieID 그룹바이. count() orderby
topMovieIds = movies_df.groupBy("movieID").count()

# 딕셔너리
# key-value
# 키 값을 알면 value 자동으로 가져옴 (movieTitle)
def lookupName(movieID):
return nameDict.value[movieID]

lookupNameUDF = func.udf(lookupName)

# MovieTitle 기존 topMovieIds 데이터에 추가
# 컬럼을 추가
moviesWithNames = topMovieIds.withColumn("movieTitle", lookupNameUDF(func.col("movieID")))

final_df = moviesWithNames.orderBy(func.desc("count"))

print(final_df.show(10))

# 세션 종료
spark.stop()

→ 저장 후 실행

python step05_advancestructype.py

→ 다음과 같이 출력된다.

Untitled

pyspark 실습01

사전준비

  • spark on windows 참고하여 세팅
  • 스파크를 설치한다.
  • 만약, 파이썬이 처음이라면 **Anaconda**를 설치한다.

pyspark 설치

  • git bash를 이용해 폴더를 생성하고 터미널을 연다.

바탕화면 우클릭 : git bash here

mkdir pyspk_project

cd pyspk_project

code .

→ git bash 터미널

Untitled

  • 가상환경 생성 후 pyspark 설치

virtualenv venv

source venv/Scripts/activate

pip install pyspark

Untitled

pyspark 실습_1

  • 폴더 파일 생성

→ 폴더 생성 : chapter01_get_started

→ 파일 생성 : step01_basic.py

→ 코드 작성

import pyspark

print(pyspark.__version__)

→ 저장

→ 경로 이동 : cd chapter01_get_started

→ 실행 : python step01_basic.py

Untitled

  • 이어서 코드작성

→ step01_basic.py를 다음과 같이 작성

1
2
3
4
5
6
7
8
9
10
11
12
# -*- coding: utf-8 -*-

import pyspark
print(pyspark.__version__)

from pyspark.sql import SparkSession

# 스파크 세션 초기화
spark = SparkSession.builder.master('local[1]').appName('SampleTutorial').getOrCreate()
rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])

print("rdd Count:", rdd.count())

→ 저장 후 실행

→ 주소창에 입력 : http://localhost:4040/

→ 다음 화면이 출력된다.

  • 교재 278p

Untitled

pyspark 실습_2

  • 슬랙에서 dataset.zip 을 다운로드
  • 압축을 풀고 chapter01_get_started 파일에 복사하여 옮긴다.

Untitled

  • VSCord에서 작업

→ 파일 생성 : step02_ratings.py

→ 코드 작성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# SparkContext
# RDD

from pyspark import SparkConf, SparkContext
import collections

print("Hello")

def main():
# MasterNode = local
# MapReduce

conf = SparkConf().setMaster('local').setAppName('RatingsHistogram')
sc = SparkContext(conf = conf)

lines = sc.textFile("ml-100k/u.logs")
ratings = lines.map(lambda x: x.split()[2])
print("ratings: ", ratings)

result = ratings.countByValue()
print("result:", result)

sortedResults = collections.OrderedDict(sorted(result.items()))
for key, value in sortedResults.items():
print("%s %i" % (key, value))

if __name__ == "__main__":
main()

→ 저장

→ 실행 : python step02_ratings.py

→ 다음 결과가 출력된다.

Untitled

  • VSCord에서 작업

→ 파일 생성 : step03_dataloading.py

→ 코드 작성

→ pip install pandas

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# Spark SQL 적용

# Spark Session
from pyspark.sql import SparkSession
import pandas as pd

# 스파크 세션 생성
"""
my_spark = SparkSession.builder.getOrCreate()
print(my_spark)

# 테이블을 확인하는 코드
print(my_spark.catalog.listDatabases())

# show database
my_spark.sql('show databases').show()

# 현재 DB 확인
my_spark.catalog.currentDatabase()
my_spark.stop()
"""

# CSV 파일 불러오기
spark = SparkSession.builder.master('local[1]').appName('DBTutorial').getOrCreate()
flights = spark.read.option('header', 'true').csv('data/flight_small.csv')
# flights.show(4)

# spark.catalog.currentDatabase()
# flights 테이블을 default DB에 추가함
flights.createOrReplaceTempView('flights')

# print(spark.catalog.listTables('default'))
# spark.sql('show tables from default').show()

# 쿼리 통해서 데이터 저장
query = "FROM Fligths SELECT * LIMIT 10"
query2 = "SELECT * FROM flights 10"

# 스파크에 세션할당
flights10 = spark.sql(query2)
flights10.show()

# Spark 데이터 프레임을 Pandas 데이터 프레임을 변환
pd_flights10 = flights10.toPandas()
print(pd_flights10.head())

→ 저장

→ 실행 : python step03_dataloading.py

Spark on Windows

Spark Installation on Windows 10 - Data Science | DSChloe

사전준비

  • 스파크를 설치하는 과정이다.
  • 사전에 파이썬 3가 설치되어 있어야 한다.
  • 만약, 파이썬이 처음이라면 **Anaconda**를 설치한다.

자바 다운로드

Spark 다운로드

Untitled

WinRAR 다운로드

  • 이 때, .tgz 압축파일을 풀기 위해서는 WinRAR 을 설치한다.

Untitled

winutils 다운로드

  • 이번에는 스파크가 윈도우 로컬 컴퓨터가 Hadoop으로 착각하게 만들 프로그램이 필요하다.
  • 이전에 받은 spark-3.2.0-bin-hadoob-3.2.tgz 와 버전이 일치하는 것을 선택해야 한다.
    • 3.2.0 버전을 다운로드 받았다.

Untitled

자바 설치 진행

  • C 드라이브에 폴더 생성 : hadoob
  • 다운로드 받은 파일 4개를 C/hadoob 에 복사하여 옮긴다

Untitled

  • 관리자 권한으로 실행 : jdk-8u311-windows-x64

Untitled

  • 계속 Next 버튼 클릭 후, 아래 파일에서 경로를 수정한다. (이 때, Program Files 공백이 있는데, 이러한 공백은 환경 설치 시 문제가 될 수 있다.)
  • Development Tools 선택
  • change 버튼으로 변경을 진행한다.

Untitled

  • c 드라이브 경로로 이동
  • Foldername: jdk 입력
    • 다음 그림과 같아야 한다.

Untitled

  • Java를 다른 폴더에 설치하려 한다.
  • 변경(C)…

Untitled

  • c 드라이브 경로에서 ‘새 폴더 만들기(M)’
  • 폴더 생성 : jre

Untitled

  • 다음과 같이 설치 위치가 지정된다.

Untitled

  • 성공적으로 설치되었다.

Untitled

winrar 설치 진행

  • 관리자 권한으로 실행 : winrar-x64-611

Untitled

  • 기본 설정으로 설치 진행

Untitled

spark 설치 진행

  • Spark 설치를 진행한다.
  • 설치 파일 우클릭 → Extract to “spark-3.2.0-bin-hadoop3.2|”

Untitled

spark 폴더 생성 및 파일 이동

  • 위 과정 이후 폴더가 생성된다.
  • 파일 이동을 하도록 한다.
    • spark-3.2.0-bin-hadoop3.2 폴더 내 모든 파일을 복사한다.
  • 그 후, C드라이브 하단에 spark 폴더를 생성한 후, 모두 옮긴다.

Untitled

log4j.properties 파일 수정

  • C -> sparkconf → [log4j.properties](http://log4j.properties) 파일을 연다.
  • 해당 파일을 메모장으로 연 후, 아래에서 INFO → ERROR 로 변경한다.
    • 작업 실행 시, 출력하는 모든 logs 값들을 없앨 수 있다.
    • 다음과 같이 설정 후 저장

Untitled

winutils 설치 진행

  • C드라이브에서 winutils-bin 폴더를 차례로 생성한다.
  • 다운로드 받은 winutils 파일을 복사하여 옮긴다.

Untitled

  • 이 파일이 Spark 실행 시, 오류 없이 실행될 수 있도록 파일 사용 권한을 얻도록 한다.
    • 이 때에는 CMD 관리자 권한으로 파일을 열어서 실행한다.
  • 관리자 권한으로 실행 : 명령 프롬프트

Untitled

  • 다음 코드들을 시행

cd c:\winutils\bin

winutils.exe chmod 777 \tmp\hive

  • 만약, ChangeFileModeByMask error (3) 에러 발생 시,

    C드라이브 하단에, tmp\hive 폴더를 차례대로 생성을 한다.

Untitled

  • 실행 결과, 에러가 발생했으므로 C드라이브에 폴더를 생성한다.
  • 폴더 생성 : tmp
    • 폴더 생성 : hive
  • 다시 코드를 실행한다.

winutils.exe chmod 777 \tmp\hive

→ 오류없이 실행되었다.

Untitled

환경변수 설정

  • ‘시스템 환경 변수 편집’ 열기

환경 변수(N)..

Untitled

시스템 환경변수를 설정한다.

  • 각 사용자 계정에 사용자 변수 - 새로 만들기 버튼을 클릭한다.

Untitled

  • 다음과 같이 설정
  • SPARK_HOME 환경변수를 설정한다.

Untitled

  • JAVA_HOME 환경변수를 설정한다.

Untitled

  • HADOOP_HOME 환경변수를 설정한다.

Untitled

  • 환경변수를 편집한다.
  • Path 선택 → 편집(E)…

Untitled

  • 아래 코드를 추가한다.
  • 새로 만들기
    • %SPARK_HOME%\bin
    • %JAVA_HOME%\bin

Untitled

파이썬 환경설정

  • Python 환경설정을 추가한다.
  • PYSPARK_PYTHON 환경변수를 설정한다.

Untitled

  • PYSPARK_DRIVER_PYTHON 환경변수를 설정한다.
  • 일단 지운다.

Untitled

  • PYSPARK_DRIVER_PYTHON_OPTS 환경변수를 설정한다.
  • 일단 삭제한다.

Untitled

스파크 테스트

  • 명령 프롬프트에서 진행

→ c:\spark 폴더로 경로를 설정 한다.

pyspark

Untitled

  • 이번에는 [README.md](http://README.md) 파일을 불러와서 아래 코드가 실행되는지 확인한다.
  • 다음 코드를 실행해 본다.

rd = sc.textFile("README.md")

rd.count()

→ 다음 결과 출력 시 성공.

Untitled

pyspark 실습_1

pyspark_실습_2

pyspark_실습_3

Spark on linux

Spark UI

Spark ML

PSQL practice 02

  • 실무 예제로 배우는 데이터 공학 72p

  • 파일 생성, 가상 환경 진입

관리자 권한으로 실행 : Ubuntu

cd ..cd ..cd mnt/c

mkdir sql

cd sql

virtualenv venv

source venv/bin/activate

  • 라이브러리 설치

pip3 install psycopg2-binary pandas faker

pip3 install pandas

pip3 install numpy

  • 실무 예제로 배우는 데이터 공학 72p 실습 진행

mkdir chapter04

cd chapter04/

→ 파일 생성 : vi createrecord.py

→ 내용 작성

1
2
3
4
5
6
7
import numpy as np
import pandas as pd
import psycopg2

print(np.__version__)
print(pd.__version__)
print(psycopg2.__version__)

→ 저장 후 코드 실행 : 버전 확인

python3 createrecord.py

→ 버전이 출력되면 성공

  • 서비스 활성화

sudo service postgresql status

sudo service postgresql stop

sudo service postgresql start

createrecord.py에 다음 내용을 추가한다.

1
2
3
4
5
6
import psycopg2 as db
conn_string="dbname='dataengineering' host='localhost' user='postgres' password='postgres'"
# 집 pc에서는 201610974
conn=db.connect(conn_string)
cur=conn.cursor()
print("Connected:", cur)

→ 저장 후 실행

python3 createrecord.py

→ Connected : cursor…. 가 출력되면 성공.

  • pgAdmin에서 실습 진행

→ 관리자 권한으로 실행 : pgAdmin

→ 로그인 비밀번호 : 201610974

test 서버 비밀번호 : postgres

→ dataengineering → Schema → public → Tables → users우클릭

→ querytool

Untitled

→ 내용 작성 : SELECT * FROM public.users;

→ F5 키로 실행한다.

→ 다음과 같은 결과가 나온다.

Untitled

  • 실무 예제로 배우는 데이터 공학 77p
  • 데이터 추출
  • Ubuntu에서 진행

createrecord.py에 다음 내용을 추가한다.

1
2
3
4
5
6
# 데이터 추출 예제
print("step 2: ----- select -----")
query = "select * from users"
cur.execute(query)
for record in cur:
print(record)

→ 저장 후 실행

python3 createrecord.py

createrecord.py에 다음 내용을 추가한다.

#print("step is done!") print(cur.fetchall()) print("--------------") print(cur.fetchmany(3)) print("--------------") print(cur.fetchone()) print("----") print(cur.rowcount)

→ 저장 후 실행

python3 createrecord.py

createrecord.py에 다음 내용을 추가한다.

1
2
3
4
5
6
# 78페이지 8번.
conn = db.connect(conn_string)
cur = conn.cursor()
f = open('fromdb.csv', 'w')
cur.copy_to(f, 'users', sep=',')
f.close()

→ 저장 후 실행

python3 createrecord.py

ls

→ fromdb.csv 파일이 생성되면 성공

createrecord.py에 다음 내용을 추가한다.

#78p 11번

f = open('fromdb.csv', 'r') f.read() print("reading data is done!")

→ 저장 후 실행

python3 createrecord.py

  • 새 폴더 생성

→ 폴더 생성 : vi querydf.py

→ 내용 작성

import psycopg2 as db import pandas as pd conn_string="dbname='dataengineering' host='localhost' user='postgres' password='postgres'" conn=db.connect(conn_string)

df = pd.read_sql("select * from users", conn) print(df.head())

→ 저장 후 실행

python3 querydf.py

  • Reference : 실무 예제로 배우는 데이터 공학

PSQL practice 01

  • pgAdmin은 GUI 툴 (있으나 없으나 상관이 없음)

  • sudo service postgresql start

    DB(형광등)—> 쿼리

             —> Select, insert, ...
    
        —> 형광등 켜야 불이 들어오듯 필수적이다.
    

실습

  • 실무 예제로 배우는 데이터 공학 72p부터 따라한다.

VSCord 에서 Ubuntu Terminal 열기

→ 폴더 생성 : chapter04

→ 파일 생성 : step01_createdf.py

→ 내용 작성

import psycopg2 as db

# 호스트, 데이터베이스 이름, 사용자 이름, 패스워드

conn_string = "dbname='dataengineering' host = 'localhost' user='postgres' password='postgres'"

#집pc의 경우에는 password='201610974

conn = db.connect(conn_string)

cur = conn.cursor()

print("db connecting....")

print(cur)

( 아이디/ 비밀번호 모두 postgres인 듯하다)

→ 저장

→ cd .. → cd .. → cd mnt/c → cd airflow-test → cd chapter04

→ 실행 : python3 step01_createdf.py

→ 다음 내용이 출력되면 성공.

db connecting….
<cursor object at 0x7fa097ba86d0; closed: 0>

step01_createdf.py 파일에 다음 내용 추가하고 저장

1
2
3
4
5
6
7
query = "insert into users (id,name,street,city,zip) values({},'{}','{}','{}','{}')".format(1,'Big Bird','Sesame Street','Fakeville','12345')
print(cur.mogrify(query))
query2 = "insert into users (id,name,street,city,zip) values(%s,%s,%s,%s,%s)"
data=(1,'Big Bird','Sesame Street','Fakeville','12345')
print(cur.mogrify(query2,data))
cur.execute(query2,data)
conn.commit()

→ 실행

  • pgAdmin에서 실습 진행

→ 관리자 권한으로 실행 : pgAdmin

→ 비밀번호 : postgres

→ dataengineering 우클릭 → querytool

→ 내용 작성 : SELECT * FROM public.users;

→ F5 키로 실행한다.

→ 다음과 같은 결과가 나온다.

Untitled

  • VSCord 에서 새로 파일을 작성한다.

→ 파일 생성 : step02_insertmany.py

→ 내용 작성 ( 실무 예제로 배우는 데이터 공학 75p )

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import psycopg2 as db
from faker import Faker
fake=Faker()
data=[]
i=2
for r in range(1000):
data.append((i,fake.name(),fake.street_address(), fake.city(),fake.zipcode()))
i+=1
data_for_db=tuple(data)
print(data_for_db)
conn_string="dbname='dataengineering' host='localhost' user='postgres' password='postgres'"
# 집pc의 경우에는 password='201610974'

conn=db.connect(conn_string)
cur=conn.cursor()
query = "insert into users (id,name,street,city,zip) values(%s,%s,%s,%s,%s)"
print(cur.mogrify(query,data_for_db[1]))
cur.executemany(query,data_for_db)
conn.commit()
query2 = "select * from users"

cur.execute(query2)
print(cur.fetchall())

→ 저장 후 실행

→ 실행 완료

  • pdAdmin 으로 이동

→ F5 키로 다시 실행

→ 다음과 같이 1000개의 데이터가 추가된다.

Untitled

  • Referencd : 실무 예제로 배우는 데이터 공학

Airflow 실습03

Elastic search 질의

  • 실무 예제로 배우는 데이터 공학 83p

관리자 권한으로 실행 : Ubuntu

→ 경로 이동 : …airflow/

source venv/bin/activate

code .

→ VSCord가 자동 실행된다

→파일 생성 : e_query.py

→ 코드 작성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import pandas as pd
from pandas.io.json import json_normalize
from elasticsearch import Elasticsearch

# Elasticsearch 객체 생성
es = Elasticsearch()

# 일래스틱서치에 보낼 문서 본문(질의 요청) JSON 객체를 만든다.
# Matchall 검색 사용
doc = {"query" : {"match_all": {}}}
res = es.search(index="users", body = doc, size = 500)
# print(res['hits']['hits'])

# 루프로 문서를 훑으면서 각 문서의 _source 필드만 출력한다.
# for doc in res['hits']['hits']:
# print(doc['_source'])

# 질의 결과를 pandas DataFrame에 넣는 것도 가능
df = json_normalize(res['hits']['hits'])
print(df.head())
print(df.info())

print(df['_source.city'].value_counts())

postgreSQL → Elastic search 데이터 전송

  • 교재 88p
  • Elastic search 가동된 상태에서 진행

→ 선행 학습 링크 참고 : postgreSQL 실습 (notion.so)

  • pgAdmin 준비된 상태에서 진행

→ 다음과 같이 출력되는 상태여야 한다.

Untitled

  • VSCord 에서 작업
  • 교재 88p

dags 폴더 아래에 파일 생성

→ 파일 생성 : airflodb.py

→ 코드 작성

import datetime as dt

from datetime import timedelta

from airflow import DAG

from airflow.operators.bash import BashOperator

from airflow.operators.python import PythonOperator

import pandas as pd

import psycopg2 as db

from elasticsearch import Elasticsearch

print("Hello")

→ 경로 이동 : (venv) kmk3593@DESKTOP-LNQ780K:/mnt/c/airflow-test/dags$

→ 실행

→ Hello 가 출력되었으므로 성공.

Untitled

  • 코드 추가 작성
  • 다음 내용을 airflodb.py에 작성한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import datetime as dt
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

import pandas as pd
import psycopg2 as db
from elasticsearch import Elasticsearch

# queryPostgresql 지정
def queryPostgresql():
conn_string="dbname='dataengineering' host='localhost' user='postgres' password='postgres'"
conn=db.connect(conn_string)
print("DB connecting....", conn)

# 데이터 추출
df = pd.read_sql("select name, city from users", conn)
df.to_csv("postgresqldata.csv")
print("----Data Saved----")

# insertElasticSearch
def insertDataElasticsearch():

# Elastic 인스턴스 생성
es = Elasticsearch()

# 데이터 불러오기
df = pd.read_csv("postgresqldata.csv")
for i, r in df.iterrows():
doc = r.to_json()
res = es.index(
index="frompostgresql"
, doc_type="doc", body=doc
)
print(res)

# DAG를 위한 인수들을 지정
default_args = {
'owner' : 'human',
'start_date' : dt.datetime(2022, 4, 18),
'retries' : 1,
'retry_delay': dt.timedelta(minutes = 5)
}

with DAG('MyDBdag',
default_args = default_args,
schedule_interval = timedelta(minutes=5), # '0 * * * * ',
) as dag:

getData = PythonOperator(
task_id = "QueryPostgreSQL"
, python_callable=queryPostgresql
)

insertData = PythonOperator(
task_id = "InsertDataElasticsearch"
, python_callable = insertDataElasticsearch
)

getData >> insertData
  • Airflow 가동

→ 저장 후 실행

python3 airflodb.py

→ airflow 실행

airflow db init

→(재시도할 경우, 실행 : airflow db reset )

→ Terminal 2개 준비하고 다음 명령 실행

airflow webserver -p 8080

airflow scheduler

→ 다음 주소로 진입

http://localhost:8080/

→ Dags

→ 활성화 : MyDBdag

→ 더블 클릭 : MyDBdag

Untitled

→ Tree

→ Update

→ 다음과 같이 출력되면 성공

Untitled

  • Reference : 실무 예제로 배우는 데이터 공학

Airflow 실습02

데이터베이스를 위한 아파치 에어플로 데이터 파이프라인 구축

실무 예제로 배우는 데이터 공학 87 ~ 91p

관리자 권한으로 실행 : ubuntu

→ elasticsearch 가동하기

Untitled

→ Kibana 가동하기

Untitled

  • 경로 이동, 가상 환경 진입

cd ..cd ..cd mnt/c/airflow-test

source venv/bin/activate

  • 교재의 elastic search 버전을 참고하여 설치

pip3 install elasticsearch==7.17.2

Untitled

  • 교재 80p
  • 일단 vi 대신에 code . 를 사용한다.

code .

( 안 될 경우, Ubuntu를 다시 시작한다)

→ 코드 실행 시, VSCord가 자동으로 시작된다

Untitled

  • 폴더 생성

→ 폴더 생성 : chapter04

→ 파일 생성 : e_search.py

→ 코드 작성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from faker import Faker

fake=Faker()
es = Elasticsearch() #or pi {127.0.0.1}

doc={"name": fake.name(),"street": fake.street_address(), "city": fake.city(),"zip":fake.zipcode()}

res=es.index(index="users",doc_type="doc",body=doc)
print(res)

doc={"query":{"match":{"_id":"pDYlOHEBxMEH3Xr-2QPk"}}}
res=es.search(index="users",body=doc,size=10)
print(res)
  • 가상환경 가동 후 실행

→ 저장

→ 터미널

source venv/bin/activate

cd chapter04/

python3 e_search.py

  • 교재 81p

→ 파일 작성 : e_search02.py

→ 코드 작성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from faker import Faker

fake=Faker()
es = Elasticsearch() #or pi {127.0.0.1}

actions = [
{
"_index": "users",
"_type": "doc",
"_source": {
"name": fake.name(),
"street": fake.street_address(),
"city": fake.city(),
"zip":fake.zipcode()}
}
for x in range(998) # or for i,r in df.iterrows()
]

response = helpers.bulk(es, actions)
print(response)

→ 저장 후 실행

python3 e_search02.py

→ 다음과 같이 (998,[]) 출력되면 성공

Untitled

  • Kibana 페이지 실행

→ 주소창에 입력 : localhost:5601/

→ 메뉴바

→ Stack Management

Untitled

→ Index Patterns

Untitled

→ Create index pattern

→ 이름 : users

Untitled

→ Create index pattern

Untitled

→ 햄버거 메뉴바 열기

→ Discover

Untitled

→ 앞에서 추가한 index의 문서를 확인할 수 있다.

Untitled

데이터 저장소

  • RDBMS

— 종류 : Oracle, PostgreSQL, MySQL, 빅쿼리(구글),…

— 표준 SQL (하나를 잘 알면, 거의 비슷!)

  • NoSQL

— 종류 : Elasticsearch, 몽고 DB (무료 버전)

어려운 것 조회 하는 방법이 RDMBS ≠ NoSQL 다름 (완전 다름!)

Airflow 재설치 및 데이터 파이프라인 구축

Setting up Apache-Airflow in Windows using WSL2 - Data Science | DSChloe

Airflow 데이터 파이프라인 구축 예제 - Data Science | DSChloe

  • 체크포인트
  1. 가상환경을 만들 수 있는냐 (virtualenv 라이브러리)

  2. 경로 이동이 자유로운가? cd 사용

  3. 환경 변수를 이해하고 잡을 수 있는가?

    vi 편집기를 자유자재로 쓸 수 있는가?

  4. 파이썬 라이브러리를 가상환경에 자유자재로 설치 할 수 있는가?

  5. 가상 환경에 자유롭게 출입할 수 있는가?

  • 사전 준비

VSCord 의 airflow.cfg 에서 진행

→ 내용 변경 : load_examples = TrueFalse

→ c드라이브 → airflow_test → dags와 chapter03, chapter04 를 배경화면에 빼둔다.

Untitled

  • airflow-test 내용물 삭제

Ubuntu 의 airflow 경로에서 진행

deactivate

sudo rm -rf *

  • 다시 가상환경 생성

virtualenv venv

ls

  • 필요한 내용이 작성되어 있는지 확인

vi ~/.bashrc

→ 내용 확인 : export AIRFLOW_HOME=/mnt/c/airflow-test

source ~/.bashrc

echo $AIRFLOW_HOME

pwd

  • 가상환경 on

source venv/bin/activate

  • 라이브러리 설치

pip3 install 'apache-airflow[postgres, slack, celery]'

  • db 설정

airflow db init

  • 계정 등록
  • firstname이 실행 결과에 영향을 주는가

airflow users create --username airflow --password airflow --firstname evan --lastname airflow --role Admin --email your_email@some.com

  • VSCord 에서 진행

→ 폴더 생성 : ( file → open folder → airflow-test )

→ airflow.cfg 파일

→ 내용 변경 : load_examples=TrueFalse

Untitled

  • Ubuntu 에서 진행

airflow db reset

→ 가상 환경 상태에서 다음 코드 실행

airflow webserver -p 8080

Untitled

→ ctrl + c 로 빠져나온다.

데이터 파이프라인 구축

개요

  • 이번에는 CSV-JSON으로 데이터를 변환하는 파이프라인을 구축하도록 한다.

Step 01. Dags 폴더 생성

  • 프로젝트 Root 하단에 Dags 폴더를 만든다.

    • dags 폴더를 확인한다.
  • dags 파일 생성

mkdir dags

ls

Step 02. 가상의 데이터 생성

  • 라이브러리 설치

→ 가상 환경에서 진행

pip3 install faker pandas

  • 폴더, 파일 생성

mkdir data

cd data

vi step01_writecsv.py

→ 코드 작성.

+ 앞으로는 이런 방식으로 코드를 작성한다.

+실무에서 필요한 습관이다.
  • faker 라이브러리를 활용하여 가상의 데이터를 생성한다. (파일 경로 : data/step01_writecsv.py)

Untitled

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from faker import Faker
import csv
output=open('data.csv','w')
fake=Faker()
header=['name','age','street','city','state','zip','lng','lat']
mywriter=csv.writer(output)
mywriter.writerow(header)
for r in range(1000):
mywriter.writerow([fake.name(),
fake.random_int(min=18, max=80, step=1),
fake.street_address(),
fake.city(),
fake.state(),
fake.zipcode(),
fake.longitude(),
fake.latitude()])
output.close()
  • 코드를 실행한다.
  • VSCord에서 data.csv 파일이 생성되어야 한다.

python3 step01_writecsv.py

ls

cat data.csv

Step 03. csv2json 파일 구축

  • 이번에는 CSV와 JSON 변환 파일을 구축하는 코드를 작성한다. (파일 경로 : dags/csv2json.py)\
  • 주요 목적 함수 csvToJson()의 역할은 data/data.csv 파일을 불러와서 fromAirflow.json 파일로 변경하는 것이다.
  • DAG는 csvToJson 함수를 하나의 작업으로 등록하는 과정을 담는다. 작업의 소유자, 시작일시, 실패 시 재시도 횟수, 재시도 지연시 시간을 지정한다.
  • print_starting >> csvJson 에서 >> 는 하류 설정 연산자라고 부른다. (동의어 비트 자리이동 연산자)

cd ..

cd dags

vi csv2json.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import datetime as dt
from datetime import timedelta

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

import pandas as pd

def csvToJson():
df=pd.read_csv('data/data.csv')
for i,r in df.iterrows():
print(r['name'])
df.to_json('fromAirflow.json',orient='records')

default_args = {
'owner': 'human',
'start_date': dt.datetime(2020, 3, 18),
'retries': 1,
'retry_delay': dt.timedelta(minutes=5),
}

with DAG('MyCSVDAG',
default_args=default_args,
schedule_interval=timedelta(minutes=5), # '0 * * * *',
) as dag:

print_starting = BashOperator(task_id='starting',
bash_command='echo "I am reading the CSV now....."')

csvJson = PythonOperator(task_id='convertCSVtoJson',
python_callable=csvToJson)

print_starting >> csvJson
  • 코드를 실행한다.
  • VSCord에서 json 파일이 생성되어야 한다.

python3 csv2json.py

Step 04. Airflow Webserver 및 Scheduler 동시 실행

  • 이제 웹서버와 스케쥴러를 동시에 실행한다. (터미널을 2개 열어야 함에 주의한다.)

VSCord 에서 WSL 터미널을 2개 띄운다.

airflow webserver -p 8080

airflow scheduler

Untitled

  • error 발생할 경우 대처

airflow.cfg의 endproint_url = 8080 체크

airflow db reset

airflow webserver -p 8080

airflow scheduler

→ 이 과정을 반복

→ 그래도 안 되면 airflow 지우고 다시 시작

이제 WebUI를 확인하면 정상적으로 작동하는 것을 확인할 수 있다

Step 05. 작업 결과물 확인

  • 최초 목적인 fromAirflow.json 로 정상적으로 변환되었는지 확인하도록 한다.

    • fromAirflow.json 파일이 확인된다면, 정상적으로 작업이 끝난 것이다.

    ls

    → 다음 내용이 출력되면 성공

    airflow-webserver.pid airflow.cfg airflow.db dags data fromAirflow.json logs venv webserver_config.py

    human@DESKTOP-V24TVMS:/mnt/c/airflow$ export AIRFLOW_HOME="$(pwd)"

    human@DESKTOP-V24TVMS:/mnt/c/airflow$ echo $AIRFLOW_HOME

  • Reference : 실무 예제로 배우는 데이터 공학

VSCord Install

VSCode Remote WSL

VSCode Remote WLS 연동 - Data Science | DSChloe

  • eclipse 보다 가볍다

VSCode 설치

Untitled

  • 설치 시, 환경 변수 체크란 잘 확인한다.

Untitled

  • 설치가 다 끝난 후에는 재부팅을 실시한다.
  • 관리자 권한으로 실행 : visual studio

Remote WSL 연동

  • 좌측 탭에서 Extension 버튼을 클릭한다.

Untitled

  • 검색 창에서 Remote WSL을 검색 후, 설치를 진행한다.

Untitled

  • 모두 클릭 후, Mark Done을 선택한다.

Untitled

  • Open Folder를 클릭한다.

Untitled

  • WSL에서 설치했던 airflow-test 폴더를 선택한다.

file → Open Folder → c 드라이브 → airflow_test 열기

Untitled

  • 메뉴 바에 Terminal 선택 후, 화면 하단에서 WSL이 있는지 확인한다.
  • Terminal 열어서 Ubuntu 실행한다.

Terminal

→ new terminal

→ 우측의 + 버튼으로 Ubuntu(WSL) 열기

Untitled

사용법

  • 해당 메뉴를 클릭하면 아래와 같이 터미널이 변경된 것을 확인할 수 있다.
  • 이번엔 서버를 가동해본다.

source venv/bin/activate

airflow webserver -p 8081

  • 사용해본다.

which python3

라이브러리 설치

  • 앞으로 ubuntu를 키지 않고 VScode에서 사용한다.
  • 라이브러리를 설치한다.

pip3 install faker

pip3 install pandas

실습

  • 파이썬 사용

폴더 생성 : 폴더 그림+ 버튼

→ chapter03 폴더 생성

→ 파일 생성 : 파일 그림+ 버튼

hello.py 파일 생성

→ 내용 작성 : print(”Hello World!”)

→ save ( ctrl + s)

Untitled

wsl Terminal 에서 다음 내용 작성

cd chapter 03/

python3 hello.py

hello.py 실행되면 성공

Untitled

  • 가상파일 만들기

파일 생성 : step01_writecsv.py

→ 코드 작성 : 실무 예제로 배우는 데이터 공학 44p

from faker import Faker import csv output=open('data.csv','w') fake=Faker() header=['name','age','street','city','state','zip','lng','lat'] mywriter=csv.writer(output) mywriter.writerow(header) for r in range(1000): mywriter.writerow([[fake.name](http://fake.name/)(),fake.random_int(min=18, max=80, step=1), fake.street_address(), fake.city(),fake.state(),fake.zipcode(),fake.longitude(),fake.latitude()]) output.close()

→ 저장 후 실행 : python3 step1_writecsv.py

→ data.csv 파일이 생성된다.

파일 생성 : step02_readcsv.py

→ 코드 작성 : 실무 예제로 배우는 데이터 공학 44 ~ 45p

import csv

with open('data.csv') as f:

myreader = csv.DictReader(f)

headers = next(myreader)

for row in myreader:

print(row['name'])

→ 저장 후 실행 : python3 step2_readcsv.py

→ 여러 사람의 이름이 출력되면 성공

파일 생성 : step03_pandas.py

→ 코드 작성 : 실무 예제로 배우는 데이터 공학 p

import pandas as pd

df = pd.read_csv('data.csv')

df.head(10)

df.to_csv('fromdf.csv', index=False)

→ 저장 후 실행

→ data.csv 파일 내용과 동일한 fromdf.csv 파일이 생성된다.

파일 생성 : step04_writejson.py

→ 코드 작성 : 실무 예제로 배우는 데이터 공학 48p

from faker import Faker

import json

output = open('data.json', 'w')

fake = Faker()

alldata = {}

alldata['records'] = []

for x in range(1000):

data = {

"name"   : fake.name(),

"age"    : fake.random_int(min=18, max=80, step=1),

"street" : fake.street_address(),

"city"   : fake.city(),

"state"  : fake.state(),

"zip"    : fake.zipcode(),

"lng"    : float(fake.longitude()),

"lat"    : float(fake.latitude())}

alldata['records'].append(data)

json.dump(alldata, output)

→ 저장 후 실행

→ data.json 이 생성된다.

데이터 불러오기

파일 생성 : step05_readjson.py

→ 코드 작성 : 실무 예제로 배우는 데이터 공학 49p

import json

with open('data.json', 'r') as f:

data = json.load(f)

print("Data Type is ", type(data))

print(data['records'][0]['name'])

→ 저장 후 실행

→ 사람 이름이 출력된다.

파일 생성 : step06_pandas.py

→ 코드 작성 : 실무 예제로 배우는 데이터 공학 49p

import pandas.io.json as pd_JSON

import pandas as pd

f = open('data.json', 'r')

data = pd_JSON.loads(f.read())

df = pd.json_normalize(data, record_path='records')

print(df.head(2))

print(df.head(2).to_json())

print(df.head(2).to_json(orient='records'))

→ 저장 후 실행

→이름, 거리, 도시 등이 출력된다.

전처리 순서

CSV —> 데이터 프레임 변환 —> 오라클 or PostgreSQL

비정형 데이터

-이미지 / 텍스트

JSON —> Pandas 데이터 프레임 변환 —> 전처리

—> JSON(NoSQL) —> ElasticSearch —> 시각화(Kibana)

파일 생성 : step07_airflowcsv.py

→ 코드 작성 : 실무 예제로 배우는 데이터 공학 51 ~ 54 p

→ 저장 후 실행

톱니바퀴 모양의 ‘airflow’를 연다

→ 다음 그림과 같이 경로가 잡혀있다.

Untitled

이 부분은 일단 넘어간다.

폴더 생성 : airflowcsv.py

→ 파일 복사 붙여넣기 : data.csv

  • Apache-Airflow 세팅 참고하여 진행

-Setting up Apache-Airflow in Windows using WSL2 - Data Science | DSChloe

airflow dbinit

airflow users create --username airflow --password airflow --firstname evan --lastname airflow --role Admin --email your_email@some.com

airflow webserver -p 8081

source venv/bin/acivate

airflow scheduler

로그인

아이디 :airflow

비번 :

cd dags

airflow dbinit

aiflow us,,,,

airflow webserber -p 8081

PSQL Install

PostgreSQL Installation on WSL2 and Windows

PostgreSQL Installation on WSL2 and Windows - Data Science | DSChloe

(password : 2016*****)

( 서버 password : postgres )

개요

  • WSL2에서 PostgreSQL을 설치한다.
  • pgAdmin은 Windows에 설치한다.

터미널 업그레이드

  • 먼저 WSL 터미널을 열고, Ubuntu 패키지를 모두 업데이트 및 업그레이드를 한다.

Windows Terminal → wsl bash

또는

Ubuntu → ..cd → ..cd

sudo apt update

sudo apt-get upgrade

PostgreSQL Installation in WSL2

  • 이번에는 WSL2에서 PostgreSQL을 설치한다. 설치가 종료되면, 반드시 버전을 확인한다.

sudo apt install postgresql postgresql-contrib

psql --version

  • 설치 이후에는 Database를 접근 가능하도록 활성화해야 한다.
    • 포트가 활성화 되어 있지 않다면 아래와 같은 메시지가 나타날 것이다.

sudo service postgresql status

  • 이번에는 활성화를 해보도록 한다. 온라인이라는 메시지가 나타난다면 활성화가 되었다는 것을 의미한다.

sudo service postgresql start

sudo service postgresql status

이번에는 활성화된 데이터베이스를 종료시킨다

sudo service postgresql stop

sudo service postgresql status

사용자 계정 Password 설정

  • 기본적으로 admin 사용자로 등록이 되어 있다. 보통 DB 초기 세팅 시에는 패스워드를 입력받아야 한다. ( password : 2016***** )

sudo passwd postgres

• 여기까지 했다면, WSL2에서 추가로 설정할 것은 더 없다.

pgAdmin Installation on Windows

  • 이번에는 pgAdmin을 설치한다. (최신버전 설치, pgAdmin 4 v6.8)웹사이트 : https://www.pgadmin.org/download/pgadmin-4-windows/
  • 설치 할 때는 관리자로 실행하며, 아래 그림과 나타난다면, install for all users를 선택한다.

Untitled

  • 설치가 완료된 뒤에는 pgAdmin 이 검색되는지를 확인한다.
  • WSL2 에서 PostgreSQL 서비스를 활성화 해야 한다.
  • 처음 실행 시 나타나는 화면에 나오는 Password 설정
  • WSL2에서 설정한 Password를 입력한다. (password : 2016*****)
    • 이번에는 서버를 설정하도록 한다 ( 서버 password : postgres )

Untitled

server 우 클릭

→ register → server

→ 내용 작성

→ General Tab 의 Name에 입력 : test

Untitled

  • connection tab 에서 작성

→ host 에 입력 : 127.0.0.1

→ Password는 WSL2에서 입력했던 Password를 입력한다.

Untitled

서비스를 활성화한다.

sudo service postgresql start

다음과 같이 Password를 설정하도록 한다

sudo -u postgres psql -c "ALTER USER postgres PASSWORD '<new-password>';"

sudo -u postgres psql -c "ALTER USER postgres PASSWORD 'postgres';"

  • 위 설정이 끝난 후, 재 접속하면 정상적으로 접근이 되는 것을 확인할 수 있다.
  • 비밀번호가 postgres 로 변경되었다.

DB 생성 및 확인

  • test 서버에 접속을 했다면, 이제 DB생성을 해본 후, pgAdmin과 WSL2에서 각각 확인을 한다.
  • 먼저, Database에서 마우스 우클릭 후, 아래와 같이 순차적으로 클릭한다.

Databases 우클릭

→ create → database..

→ 새로운 데이터베이스명은 dataengineering으로 명명한다.

Untitled

  • 이번에는 아래 그림과 같이 dataengineering 데이터베이스의 노드 중 shemas를 확장하고 다시 public을 확장한다.

Untitled

dataengineering

→ shemas

→ public

→ tables 우클릭

→ Create → Table

→ General 탭 Name 입력 : users

→ Column 탭에서는 아래와 같이 테이블 열을 추가한다

→ 우측의 + 버튼을 이용한다.

Untitled

wsl2 가상환경 : postgresql 서버 & DB

윈도우 : GUI - pgAdmin

  • 이번에는 psql에 접속 후 dataengineering DB와 생성된 테이블을 조회하는 쿼리를 실행한다.

    • dataengineering 테이블이 조회되는지 확인한다.

    Untitled

    sudo -u postgres psql (psql 접속 명령)

    → postgres=# 형태의 프롬프트가 출력된다.

    → 그림 맨 밑줄처럼 \l 을 입력하고 enter

    → dataengineering 이 출력되면 성공

이번에는 생성된 dataengineering DB에 연결 후, 테이블을 조회한다

sudo -u postgres psql ( psql에 접속)

→ postgres=# 형태의 프롬프트가 출력된다.

\c dataengineering

→ (dataengineering DB에 연결되어

dataengineering=# 형태로 바뀐다. )

\dt

→ users 테이블이 출력 되면 성공

\q 하여 빠져나온다.

sudo service postgresql stop 하여 DB 종료

→ 반드시 종료하자

Refefence

SQLAlchemy : 반드시 사용하자.

나중에라도 파이썬에 연결하여 사용한다.

https://www.sqlalchemy.org/

postgreSQL 실습

postgreSQL 실습_2