pyspark 실습01

사전준비

  • spark on windows 참고하여 세팅
  • 스파크를 설치한다.
  • 만약, 파이썬이 처음이라면 **Anaconda**를 설치한다.

pyspark 설치

  • git bash를 이용해 폴더를 생성하고 터미널을 연다.

바탕화면 우클릭 : git bash here

mkdir pyspk_project

cd pyspk_project

code .

→ git bash 터미널

Untitled

  • 가상환경 생성 후 pyspark 설치

virtualenv venv

source venv/Scripts/activate

pip install pyspark

Untitled

pyspark 실습_1

  • 폴더 파일 생성

→ 폴더 생성 : chapter01_get_started

→ 파일 생성 : step01_basic.py

→ 코드 작성

import pyspark

print(pyspark.__version__)

→ 저장

→ 경로 이동 : cd chapter01_get_started

→ 실행 : python step01_basic.py

Untitled

  • 이어서 코드작성

→ step01_basic.py를 다음과 같이 작성

1
2
3
4
5
6
7
8
9
10
11
12
# -*- coding: utf-8 -*-

import pyspark
print(pyspark.__version__)

from pyspark.sql import SparkSession

# 스파크 세션 초기화
spark = SparkSession.builder.master('local[1]').appName('SampleTutorial').getOrCreate()
rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])

print("rdd Count:", rdd.count())

→ 저장 후 실행

→ 주소창에 입력 : http://localhost:4040/

→ 다음 화면이 출력된다.

  • 교재 278p

Untitled

pyspark 실습_2

  • 슬랙에서 dataset.zip 을 다운로드
  • 압축을 풀고 chapter01_get_started 파일에 복사하여 옮긴다.

Untitled

  • VSCord에서 작업

→ 파일 생성 : step02_ratings.py

→ 코드 작성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# SparkContext
# RDD

from pyspark import SparkConf, SparkContext
import collections

print("Hello")

def main():
# MasterNode = local
# MapReduce

conf = SparkConf().setMaster('local').setAppName('RatingsHistogram')
sc = SparkContext(conf = conf)

lines = sc.textFile("ml-100k/u.logs")
ratings = lines.map(lambda x: x.split()[2])
print("ratings: ", ratings)

result = ratings.countByValue()
print("result:", result)

sortedResults = collections.OrderedDict(sorted(result.items()))
for key, value in sortedResults.items():
print("%s %i" % (key, value))

if __name__ == "__main__":
main()

→ 저장

→ 실행 : python step02_ratings.py

→ 다음 결과가 출력된다.

Untitled

  • VSCord에서 작업

→ 파일 생성 : step03_dataloading.py

→ 코드 작성

→ pip install pandas

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# Spark SQL 적용

# Spark Session
from pyspark.sql import SparkSession
import pandas as pd

# 스파크 세션 생성
"""
my_spark = SparkSession.builder.getOrCreate()
print(my_spark)

# 테이블을 확인하는 코드
print(my_spark.catalog.listDatabases())

# show database
my_spark.sql('show databases').show()

# 현재 DB 확인
my_spark.catalog.currentDatabase()
my_spark.stop()
"""

# CSV 파일 불러오기
spark = SparkSession.builder.master('local[1]').appName('DBTutorial').getOrCreate()
flights = spark.read.option('header', 'true').csv('data/flight_small.csv')
# flights.show(4)

# spark.catalog.currentDatabase()
# flights 테이블을 default DB에 추가함
flights.createOrReplaceTempView('flights')

# print(spark.catalog.listTables('default'))
# spark.sql('show tables from default').show()

# 쿼리 통해서 데이터 저장
query = "FROM Fligths SELECT * LIMIT 10"
query2 = "SELECT * FROM flights 10"

# 스파크에 세션할당
flights10 = spark.sql(query2)
flights10.show()

# Spark 데이터 프레임을 Pandas 데이터 프레임을 변환
pd_flights10 = flights10.toPandas()
print(pd_flights10.head())

→ 저장

→ 실행 : python step03_dataloading.py

Author

minkuen

Posted on

2022-04-26

Updated on

2022-04-29

Licensed under

You need to set install_url to use ShareThis. Please set it in _config.yml.
You forgot to set the business or currency_code for Paypal. Please set it in _config.yml.

Comments

You forgot to set the shortname for Disqus. Please set it in _config.yml.