Data Import on SQL Developer

데이터 임포트

  • SQL Developer 열기
  • 우클릭 : 테이블 → 데이터 임포트

Untitled

  • ‘찾아보기’ → 임포트할 파일을 찾아서 열기

Untitled

  • 다음과 같이 설정한다.
    • 인코딩 : UTF-8

Untitled

  • 테이블 이름을 설정한다.

Untitled

  • 그 외에는 설정을 디폴트로 두고 넘어간다.
  • 완료

Untitled

  • 다음과 같이 알림창이 출력된다.

Untitled

  • SQL문을 실행하여 정상 작동하는지 시험해본다.

Untitled

  • 정상적으로 출력되었다면 성공.

Oracle on Jupyter Lab

사전준비

  • Oracle 설치
  • SQL Developer 설치
  • Oracle DB 접속 설정

Jupyter Lab에서 설치 및 설정 진행

  • 순서대로 작성하고 실행한다.
  • ipython 설치

pip install ipython-sql

  • cx_Oracle 설치

pip install cx_Oracle

  • 로드

%load_ext sql

  • DB 연동
    • DB 접속 선택을 참고하여 코드를 작성한다.
    1
    2
    # Oracle
    %sql oracle://user_name:password@127.0.0.1:port_number/db
    • 코드 사용 예시 : %sql oracle://ora_user:evan@127.0.0.1:1521/myoracle
    • user_name, password, port_number, db는 본인의 DB 접속 선택을 참고한다.

Untitled

  • 여기까지 코드들을 실행하면 다음과 같이 출력된다.

Untitled

실행

  • 쿼리 작성 후 실행
    • 단, %%sql 을 붙여야 한다.
    • 쿼리 끝에 세미콜론(;)을 붙여서는 안 된다.

Untitled

Oracle_setting

사전준비

  • 오라클을 설치한다.

구글링 : oracle database 19c download

Oracle Database 19c Download for Microsoft Windows x64 (64-bit) | Oracle 대한민국

Untitled

  • SQL Developer 설치한다.

구글링 : sql developer

Oracle SQL Developer Downloads

Untitled

  • c 드라이브 경로에 폴더 생성 : sql_lecture

앞서 다운로드한 설치 파일을 sql_lecture 폴더에 정리한다.

  • 압축 해제한다.

우클릭 후에 “extract WINDOWS.X64\”

  • 관리자 권한으로 실행 : setup
  • 다음 경로로 이동하여 실행하면 된다.

c 드라이브 → sql_lecture 폴더 → WINDOWS.X64_193000_db_home 폴더 → setup

Untitled

  • 체크 : 단일 인스턴스 데이터베이스 생성 및 구성

Untitled

  • 체크 : 데스크톱 클래스

Untitled

  • 체크 : 가상 계정 사용

Untitled

  • 오라클 SQL과 PL/SQL을 다루는 기술 22p 참고하여 설정한다.
  • 전역 데이터베이스 이름 : myoracle
  • 비밀번호 : 1234
  • 체크 해제 : 컨테이너 베이스로 생성

Untitled

  • 설치까지 진행한다.
  • 설치 완료

Untitled

1단계 sqlplus 실행하기

  • 관리자 권한으로 실행 : SQL Plus

Untitled

  • 정보를 입력한다.

→사용자명 : system

→비밀번호 : 1234

Untitled

2단계 : 테이블 스페이스 생성하기

  • 오라클 SQL과 PL/SQL을 다루는 기술 27p 참고
  • 다음 코드를 사용하여 테이블 스페이스를 생성한다.
    • 테이블 스페이스는 myts라 명명하고 100MB 크기로 생성
    • 만약 데이터가 증가하면 5MB씩 자동 증가 옵션 추가

CREATE TABLESPACE myts DATAFILE 'C:\sql_lecture\oradata\MYORACLE\myts.dbf' SIZE 100M AUTOEXTEND ON NEXT 5M;

Untitled

3단계 : 사용자 생성

  • 해당 사용자에게 롤(Role, 권한)을 부여해야 한다. 현 시점에서는 ‘ora_user’ 사용자에게 DBA라는 롤을 부여한다.

    • 이 권한을 부여받으면 오라클에서 제공하는 웬만한 기능을 모두 사용한다.
  • 사용자를 생성하는 코드를 작성한다.

(패스워드를 evan으로 할 경우, 다음과 같이 작성)

CREATE USER ora_user IDENTIFIED BY evan DEFAULT TABLESPACE MYTS TEMPORARY TABLESPACE TEMP;

Untitled

4단계 : 사용자 계정으로 DB에 접속하기

  • ora_user로 접속한다.

GRANT DBA TO ora_user;

connect ora_user/evan;

  • 접속 후, show user; 입력하면 현재 로그인한 사용자 이름이 출력된다.

show user;

Untitled

SQL Developer 실행

  • 새 접속 화면이 나타나면 접속 이름, 사용자 이름을 ora_user로 입력, 비밀번호는 입력, SID 항목에는 처음 설치 시 이름인 myoracle을 입력하고 테스트를 실행한다.

  • 압축 해제한다.

우클릭 후에 “extract sqldeveloper214.3…”

  • 관리자 권한으로 실행 : sqldeveloper
  • 다음 경로로 이동하여 실행하면 된다.

c 드라이브 → sql_lecture 폴더 → sqldeveloper-21.4.3.063.0100-x64 폴더

→ sqldeveloper 폴더 → sqldeveloper

  • 만약 다음 확인 창이 출력되면 ‘아니오’ 선택한다.

Untitled

  • Oracle 접속을 새로 만든다.

우클릭 : Oracle 접속 → 새 접속

Untitled

  • 다음과 같이 설정하고 ‘테스트’
    • 사용자 이름 : ora_user
    • 비밀번호 : evan
  • 상태 : 성공
  • 성공했다면 ‘접속’

Untitled

  • 다음과 같이 출력된다.

Untitled

환경설정

  • 다음 환결설정에서 인코딩을 UTF-8로 변경한다.
    • 메뉴 바 → 도구 → 환경 설정 → 환경 → 인코딩 : UTF-8

Untitled

SQL Developer 날짜 기록

  • 다음 경로로 [NLS] 항목을 선택한다.
    • 메뉴 바 → 도구 → 환경 설정 → 데이터 베이스 → NLS
  • NLS에서 ‘시간 기록 형식’을 수정.
    • 다음과 같이 수정한다.
    • 시간 기록 형식 : YYYY/MM/DD HH24:MI:SS

Untitled

샘플 스키마 설치

Untitled

  • 관리자 권한으로 실행 : 명령 프롬프트
  • C:\backup 경로로 이동한다.
  • 다음 코드 실행 : expall.dmp 을 올린다.

imp ora_user/evan file=expall.dmp log=empall.log ignore=y grants=y rows=y indexes=y full=y

  • 다음 코드 실행 : expcust.dmp 을 올린다.

imp ora_user/evan file=expcust.dmp log=expcust.log ignore=y grants=y rows=y indexes=y full=y

Untitled

  • 임포트가 정상 종료되었다면 Oracle SQL로 이동하여 작업
  • 다음 코드를 작성

SELECT table_name FROM user_tables;

→ 실행 : ctrl + enter

→ 다음과 같이 출력되면 성공.

Untitled

  • Git 연동

SQL Developer with Git - Data Science | DSChloe

Oracle 실습01

Oracle 실습02

Oracle 실습03

Oracle 실습04

Oracle 실습05

Oracle 실습06

Oracle on Jupyter Lab

Crawling_practice

  • 웹 크롤링을 시도해본다.
  • 우선 Pycharm 환경에서 가상환경을 생성해야 한다.

바탕화면에 crawling 폴더 생성

→ 우클릭하여 pycharm으로 열기

→ File → Settings

→ Project : crawling → python interpreter

→ 톱니모양 → add

Untitled

  • 필요한 패키지들을 설치한다.

→ git bash 터미널

pip install beautifulsoup4

pip install numpy pandas matplotlib seaborn

pip install requests

  • 브라우저에서 검색을 진행

→ 검색 : 확진자수

→ 우클릭 → 검사

Untitled

  • 원하는 정보가 포함된 코드를 선택 가능

Untitled

  • index.html을 생성한다.

crawling 폴더 우클릭 → New → HTML.file

Untitled

  • 다음과 같이 입력하고 index 파일을 열어본다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<titl>test</titl>
</head>
<body>
<h1>aaaaaaaa</h1>
<h2>dddd</h2>
<div class="chapter01">
<p>Don't Crawl here </p>
</div>
<div class="chapter02">
<p>Just Crawling here</p>
</div>
</body>
</html>
  • index 파일을 열면 index.html에 작성한 대로 출력된다.

Untitled

  • 이번에는 다른 파일에 코드를 작성해보자.
    • 일단 main.py에 작성한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
from bs4 import BeautifulSoup

# 첫 번째 작업 index.html 파일을 BeautifulSoup 객체로 변환
## 클래스 변환 --> 클래스 내부의 메서드 사용

# html 파일을 변환
soup = BeautifulSoup(open("index.html", encoding='UTF-8'), "html.parser")
# print(type(soup))

# print(soup.find("div", class_="chapter02"))
# print(soup.find("p"))
results = soup.find_all("p")
print(results[1])
  • Index.html에서 크롤링하여 다음과 같이 출력된다.

python main.py

Untitled

Untitled

  • 정렬 : ctrl + alt + l

Python

웹상에 있는 데이터를 숩집하는 도구

  • BeautifulSoup 가장 일반적인 수집 도구 (CSS 통해서 수집)
  • Scrapy (CSS, XAPTH 통해서 데이터 수집 + JavaScript)
  • Selenium (CSS, XPATH 통해서 데이터 수집 + JAVAScript)

—> 자바 필요 + 여러가지 설치 도구 필요

웹 사이트 만드는 3대 조건 + 1

  • HTML, CSS, JavaScript, Ajax (비동기처리)

웹 사이트 구동 방식

Crawling 실습

Crawling_setting

  • 웹 크롤링을 시도해본다.
  • 우선 Pycharm 환경에서 가상환경을 생성해야 한다.

바탕화면에 crawling 폴더 생성

→ 우클릭하여 pycharm으로 열기

→ File → Settings

→ Project : crawling → python interpreter

→ 톱니모양 → add

Untitled

  • 필요한 패키지들을 설치한다.

→ git bash 터미널

pip install beautifulsoup4

pip install numpy pandas matplotlib seaborn

pip install requests

  • 브라우저에서 검색을 진행

→ 검색 : 확진자수

→ 우클릭 → 검사

Untitled

  • 원하는 정보가 포함된 코드를 선택 가능

Untitled

  • index.html을 생성한다.

crawling 폴더 우클릭 → New → HTML.file

Untitled

  • 다음과 같이 입력하고 index 파일을 열어본다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<titl>test</titl>
</head>
<body>
<h1>aaaaaaaa</h1>
<h2>dddd</h2>
<div class="chapter01">
<p>Don't Crawl here </p>
</div>
<div class="chapter02">
<p>Just Crawling here</p>
</div>
</body>
</html>
  • index 파일을 열면 index.html에 작성한 대로 출력된다.

Untitled

  • 이번에는 다른 파일에 코드를 작성해보자.
    • 일단 main.py에 작성한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
from bs4 import BeautifulSoup

# 첫 번째 작업 index.html 파일을 BeautifulSoup 객체로 변환
## 클래스 변환 --> 클래스 내부의 메서드 사용

# html 파일을 변환
soup = BeautifulSoup(open("index.html", encoding='UTF-8'), "html.parser")
# print(type(soup))

# print(soup.find("div", class_="chapter02"))
# print(soup.find("p"))
results = soup.find_all("p")
print(results[1])
  • Index.html에서 크롤링하여 다음과 같이 출력된다.

python main.py

Untitled

Untitled

  • 정렬 : ctrl + alt + l

Python

웹상에 있는 데이터를 숩집하는 도구

  • BeautifulSoup 가장 일반적인 수집 도구 (CSS 통해서 수집)
  • Scrapy (CSS, XAPTH 통해서 데이터 수집 + JavaScript)
  • Selenium (CSS, XPATH 통해서 데이터 수집 + JAVAScript)

—> 자바 필요 + 여러가지 설치 도구 필요

웹 사이트 만드는 3대 조건 + 1

  • HTML, CSS, JavaScript, Ajax (비동기처리)

웹 사이트 구동 방식

Crawling 실습

Spark UI

  • 가상환경을 생성한다.

/mnt/c 경로에서 실행

mkdir temp

cd temp

virtualenv venv

Untitled

  • 가상환경에서 pyspark를 설치한다.

source venv/bin/activate

pip install pyspark

  • 다음 링크 접속

Quick Start - Spark 3.2.1 Documentation (apache.org)

  • 다음 내용을 복사한다.

Untitled

1
2
3
This program just counts the number of lines containing 'a' and the number containing 'b' in a text file. Note that you'll need to replace YOUR_SPARK_HOME with the location where Spark is installed. As with the Scala and Java examples, we use a SparkSession to create Datasets. For applications that use custom classes or third-party libraries, we can also add code dependencies to spark-submit through its --py-files argument by packaging them into a .zip file (see spark-submit --help for details). SimpleApp is simple enough that we do not need to specify any code dependencies.

We can run this application using the bin/spark-submit script:

mkdir data

cd data

ls

vi README.md

→ 위에서 복사한 내용을 붙여넣는다.

:wq

→ 내용 확인cat README.md

cd ..

vi SimpleApp.py

→ 코드 작성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from pyspark.sql import SparkSession

logFile = "data/README.md" # Should be some file on your system
spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
logData = spark.read.text(logFile).cache()

numAs = logData.filter(logData.value.contains('a')).count()
numBs = logData.filter(logData.value.contains('b')).count()

print("Lines with a: %i, lines with b: %i" % (numAs, numBs))

input("Typing....")

spark.stop()

→ 저장 후 실행

→ python3 SimpleApp.py

→ 경로 확인 : echo $SPARK_HOME

$SPARK_HOME/bin/spark-submit --master local[4] SimpleApp.py

Untitled

  • 코드 샐행 후
  • 위 결과 참고하여 address 복사

→ 뒤에 :4041을 추가하여 주소창에 입력한다.

(코드 실행 후 나오는 텍스트에서 SparkUI를 확인하자)

→ 주소창에 입력하여 접속 : http://172.19.91.118:4041

→ 다음 화면 출력 시 성공.

Untitled

Spark on Linux

WSL2에서의 Spark 설치 - Data Science | DSChloe

개요

  • 간단하게 PySpark를 설치해보는 과정을 작성한다.
  • WSL2 설치 방법은 다루지 않는다.

필수 파일 설치

  • 설치가 안 되었을 경우에 설치한다.
  • 자바 및 Spark 파일을 설치하도록 한다.
1
2
3
$ sudo apt-get install openjdk-8-jdk
$ sudo wget https://archive.apache.org/dist/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz
$ sudo tar -xvzf spark-3.2.0-bin-hadoop3.2.tgz

.bashrc 파일 수정

  • 경로를 다음과 같이 설정한다.
1
2
evan@evan:/mnt/c/hadoop$ pwd
/mnt/c/hadoop
  • 설치한 파일은 다음과 같다.
1
2
evan@evan:/mnt/c/hadoop$ ls
spark-3.2.0-bin-hadoop3.2 spark-3.2.0-bin-hadoop3.2.tgz
  • vi ~/.bashrc 파일을 열고 다음과 같이 코드를 작성한다.
    • 다른 코드는 건드리지 않는다.
    • 마지막 라인에서 작성한다.
1
2
3
4
5
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
export SPARK_HOME=/mnt/c/spark
export PATH=$JAVA_HOME/bin:$PATH
export PATH=$SPARK_HOME/bin:$PATH
export PYSPARK_PYTHON=/usr/bin/python3

Untitled

테스트

  • pyspark를 실행한다. (경로에 주의한다)
  • SPARK_HOME을 다음과 같이 설정했으니 해당 경로에서 실행.
  • export SPARK_HOME=/mnt/c/spark

경로 이동 : cd ..

cd spark/

source ~/.bashrc

pyspark

Untitled

  • 정상적으로 작동한지 테스트한다.
  • 해당 경로에 README.md 파일이 있다면 시행해보자.

rd = sc.textFile("README.md")

rd.count()

→ 다음과 같이 출력된다면 성공이다.

Untitled

Spark on Windows

Spark Installation on Windows 10 - Data Science | DSChloe

사전준비

  • 스파크를 설치하는 과정이다.
  • 사전에 파이썬 3가 설치되어 있어야 한다.
  • 만약, 파이썬이 처음이라면 **Anaconda**를 설치한다.

자바 다운로드

Spark 다운로드

Untitled

WinRAR 다운로드

  • 이 때, .tgz 압축파일을 풀기 위해서는 WinRAR 을 설치한다.

Untitled

winutils 다운로드

  • 이번에는 스파크가 윈도우 로컬 컴퓨터가 Hadoop으로 착각하게 만들 프로그램이 필요하다.
  • 이전에 받은 spark-3.2.0-bin-hadoob-3.2.tgz 와 버전이 일치하는 것을 선택해야 한다.
    • 3.2.0 버전을 다운로드 받았다.

Untitled

자바 설치 진행

  • C 드라이브에 폴더 생성 : hadoob
  • 다운로드 받은 파일 4개를 C/hadoob 에 복사하여 옮긴다

Untitled

  • 관리자 권한으로 실행 : jdk-8u311-windows-x64

Untitled

  • 계속 Next 버튼 클릭 후, 아래 파일에서 경로를 수정한다. (이 때, Program Files 공백이 있는데, 이러한 공백은 환경 설치 시 문제가 될 수 있다.)
  • Development Tools 선택
  • change 버튼으로 변경을 진행한다.

Untitled

  • c 드라이브 경로로 이동
  • Foldername: jdk 입력
    • 다음 그림과 같아야 한다.

Untitled

  • Java를 다른 폴더에 설치하려 한다.
  • 변경(C)…

Untitled

  • c 드라이브 경로에서 ‘새 폴더 만들기(M)’
  • 폴더 생성 : jre

Untitled

  • 다음과 같이 설치 위치가 지정된다.

Untitled

  • 성공적으로 설치되었다.

Untitled

winrar 설치 진행

  • 관리자 권한으로 실행 : winrar-x64-611

Untitled

  • 기본 설정으로 설치 진행

Untitled

spark 설치 진행

  • Spark 설치를 진행한다.
  • 설치 파일 우클릭 → Extract to “spark-3.2.0-bin-hadoop3.2|”

Untitled

spark 폴더 생성 및 파일 이동

  • 위 과정 이후 폴더가 생성된다.
  • 파일 이동을 하도록 한다.
    • spark-3.2.0-bin-hadoop3.2 폴더 내 모든 파일을 복사한다.
  • 그 후, C드라이브 하단에 spark 폴더를 생성한 후, 모두 옮긴다.

Untitled

log4j.properties 파일 수정

  • C -> sparkconf → [log4j.properties](http://log4j.properties) 파일을 연다.
  • 해당 파일을 메모장으로 연 후, 아래에서 INFO → ERROR 로 변경한다.
    • 작업 실행 시, 출력하는 모든 logs 값들을 없앨 수 있다.
    • 다음과 같이 설정 후 저장

Untitled

winutils 설치 진행

  • C드라이브에서 winutils-bin 폴더를 차례로 생성한다.
  • 다운로드 받은 winutils 파일을 복사하여 옮긴다.

Untitled

  • 이 파일이 Spark 실행 시, 오류 없이 실행될 수 있도록 파일 사용 권한을 얻도록 한다.
    • 이 때에는 CMD 관리자 권한으로 파일을 열어서 실행한다.
  • 관리자 권한으로 실행 : 명령 프롬프트

Untitled

  • 다음 코드들을 시행

cd c:\winutils\bin

winutils.exe chmod 777 \tmp\hive

  • 만약, ChangeFileModeByMask error (3) 에러 발생 시,

    C드라이브 하단에, tmp\hive 폴더를 차례대로 생성을 한다.

Untitled

  • 실행 결과, 에러가 발생했으므로 C드라이브에 폴더를 생성한다.
  • 폴더 생성 : tmp
    • 폴더 생성 : hive
  • 다시 코드를 실행한다.

winutils.exe chmod 777 \tmp\hive

→ 오류없이 실행되었다.

Untitled

환경변수 설정

  • ‘시스템 환경 변수 편집’ 열기

환경 변수(N)..

Untitled

시스템 환경변수를 설정한다.

  • 각 사용자 계정에 사용자 변수 - 새로 만들기 버튼을 클릭한다.

Untitled

  • 다음과 같이 설정
  • SPARK_HOME 환경변수를 설정한다.

Untitled

  • JAVA_HOME 환경변수를 설정한다.

Untitled

  • HADOOP_HOME 환경변수를 설정한다.

Untitled

  • 환경변수를 편집한다.
  • Path 선택 → 편집(E)…

Untitled

  • 아래 코드를 추가한다.
  • 새로 만들기
    • %SPARK_HOME%\bin
    • %JAVA_HOME%\bin

Untitled

파이썬 환경설정

  • Python 환경설정을 추가한다.
  • PYSPARK_PYTHON 환경변수를 설정한다.

Untitled

  • PYSPARK_DRIVER_PYTHON 환경변수를 설정한다.
  • 일단 지운다.

Untitled

  • PYSPARK_DRIVER_PYTHON_OPTS 환경변수를 설정한다.
  • 일단 삭제한다.

Untitled

스파크 테스트

  • 명령 프롬프트에서 진행

→ c:\spark 폴더로 경로를 설정 한다.

pyspark

Untitled

  • 이번에는 [README.md](http://README.md) 파일을 불러와서 아래 코드가 실행되는지 확인한다.
  • 다음 코드를 실행해 본다.

rd = sc.textFile("README.md")

rd.count()

→ 다음 결과 출력 시 성공.

Untitled

pyspark 실습_1

pyspark_실습_2

pyspark_실습_3

Spark on linux

Spark UI

Spark ML

Airflow 실습03

Elastic search 질의

  • 실무 예제로 배우는 데이터 공학 83p

관리자 권한으로 실행 : Ubuntu

→ 경로 이동 : …airflow/

source venv/bin/activate

code .

→ VSCord가 자동 실행된다

→파일 생성 : e_query.py

→ 코드 작성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import pandas as pd
from pandas.io.json import json_normalize
from elasticsearch import Elasticsearch

# Elasticsearch 객체 생성
es = Elasticsearch()

# 일래스틱서치에 보낼 문서 본문(질의 요청) JSON 객체를 만든다.
# Matchall 검색 사용
doc = {"query" : {"match_all": {}}}
res = es.search(index="users", body = doc, size = 500)
# print(res['hits']['hits'])

# 루프로 문서를 훑으면서 각 문서의 _source 필드만 출력한다.
# for doc in res['hits']['hits']:
# print(doc['_source'])

# 질의 결과를 pandas DataFrame에 넣는 것도 가능
df = json_normalize(res['hits']['hits'])
print(df.head())
print(df.info())

print(df['_source.city'].value_counts())

postgreSQL → Elastic search 데이터 전송

  • 교재 88p
  • Elastic search 가동된 상태에서 진행

→ 선행 학습 링크 참고 : postgreSQL 실습 (notion.so)

  • pgAdmin 준비된 상태에서 진행

→ 다음과 같이 출력되는 상태여야 한다.

Untitled

  • VSCord 에서 작업
  • 교재 88p

dags 폴더 아래에 파일 생성

→ 파일 생성 : airflodb.py

→ 코드 작성

import datetime as dt

from datetime import timedelta

from airflow import DAG

from airflow.operators.bash import BashOperator

from airflow.operators.python import PythonOperator

import pandas as pd

import psycopg2 as db

from elasticsearch import Elasticsearch

print("Hello")

→ 경로 이동 : (venv) kmk3593@DESKTOP-LNQ780K:/mnt/c/airflow-test/dags$

→ 실행

→ Hello 가 출력되었으므로 성공.

Untitled

  • 코드 추가 작성
  • 다음 내용을 airflodb.py에 작성한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import datetime as dt
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

import pandas as pd
import psycopg2 as db
from elasticsearch import Elasticsearch

# queryPostgresql 지정
def queryPostgresql():
conn_string="dbname='dataengineering' host='localhost' user='postgres' password='postgres'"
conn=db.connect(conn_string)
print("DB connecting....", conn)

# 데이터 추출
df = pd.read_sql("select name, city from users", conn)
df.to_csv("postgresqldata.csv")
print("----Data Saved----")

# insertElasticSearch
def insertDataElasticsearch():

# Elastic 인스턴스 생성
es = Elasticsearch()

# 데이터 불러오기
df = pd.read_csv("postgresqldata.csv")
for i, r in df.iterrows():
doc = r.to_json()
res = es.index(
index="frompostgresql"
, doc_type="doc", body=doc
)
print(res)

# DAG를 위한 인수들을 지정
default_args = {
'owner' : 'human',
'start_date' : dt.datetime(2022, 4, 18),
'retries' : 1,
'retry_delay': dt.timedelta(minutes = 5)
}

with DAG('MyDBdag',
default_args = default_args,
schedule_interval = timedelta(minutes=5), # '0 * * * * ',
) as dag:

getData = PythonOperator(
task_id = "QueryPostgreSQL"
, python_callable=queryPostgresql
)

insertData = PythonOperator(
task_id = "InsertDataElasticsearch"
, python_callable = insertDataElasticsearch
)

getData >> insertData
  • Airflow 가동

→ 저장 후 실행

python3 airflodb.py

→ airflow 실행

airflow db init

→(재시도할 경우, 실행 : airflow db reset )

→ Terminal 2개 준비하고 다음 명령 실행

airflow webserver -p 8080

airflow scheduler

→ 다음 주소로 진입

http://localhost:8080/

→ Dags

→ 활성화 : MyDBdag

→ 더블 클릭 : MyDBdag

Untitled

→ Tree

→ Update

→ 다음과 같이 출력되면 성공

Untitled

  • Reference : 실무 예제로 배우는 데이터 공학

Airflow 실습02

데이터베이스를 위한 아파치 에어플로 데이터 파이프라인 구축

실무 예제로 배우는 데이터 공학 87 ~ 91p

관리자 권한으로 실행 : ubuntu

→ elasticsearch 가동하기

Untitled

→ Kibana 가동하기

Untitled

  • 경로 이동, 가상 환경 진입

cd ..cd ..cd mnt/c/airflow-test

source venv/bin/activate

  • 교재의 elastic search 버전을 참고하여 설치

pip3 install elasticsearch==7.17.2

Untitled

  • 교재 80p
  • 일단 vi 대신에 code . 를 사용한다.

code .

( 안 될 경우, Ubuntu를 다시 시작한다)

→ 코드 실행 시, VSCord가 자동으로 시작된다

Untitled

  • 폴더 생성

→ 폴더 생성 : chapter04

→ 파일 생성 : e_search.py

→ 코드 작성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from faker import Faker

fake=Faker()
es = Elasticsearch() #or pi {127.0.0.1}

doc={"name": fake.name(),"street": fake.street_address(), "city": fake.city(),"zip":fake.zipcode()}

res=es.index(index="users",doc_type="doc",body=doc)
print(res)

doc={"query":{"match":{"_id":"pDYlOHEBxMEH3Xr-2QPk"}}}
res=es.search(index="users",body=doc,size=10)
print(res)
  • 가상환경 가동 후 실행

→ 저장

→ 터미널

source venv/bin/activate

cd chapter04/

python3 e_search.py

  • 교재 81p

→ 파일 작성 : e_search02.py

→ 코드 작성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from faker import Faker

fake=Faker()
es = Elasticsearch() #or pi {127.0.0.1}

actions = [
{
"_index": "users",
"_type": "doc",
"_source": {
"name": fake.name(),
"street": fake.street_address(),
"city": fake.city(),
"zip":fake.zipcode()}
}
for x in range(998) # or for i,r in df.iterrows()
]

response = helpers.bulk(es, actions)
print(response)

→ 저장 후 실행

python3 e_search02.py

→ 다음과 같이 (998,[]) 출력되면 성공

Untitled

  • Kibana 페이지 실행

→ 주소창에 입력 : localhost:5601/

→ 메뉴바

→ Stack Management

Untitled

→ Index Patterns

Untitled

→ Create index pattern

→ 이름 : users

Untitled

→ Create index pattern

Untitled

→ 햄버거 메뉴바 열기

→ Discover

Untitled

→ 앞에서 추가한 index의 문서를 확인할 수 있다.

Untitled

데이터 저장소

  • RDBMS

— 종류 : Oracle, PostgreSQL, MySQL, 빅쿼리(구글),…

— 표준 SQL (하나를 잘 알면, 거의 비슷!)

  • NoSQL

— 종류 : Elasticsearch, 몽고 DB (무료 버전)

어려운 것 조회 하는 방법이 RDMBS ≠ NoSQL 다름 (완전 다름!)