Spark ML

  • Spark로 머신러닝을 사용해 본다.
  • 실용성과 별개로 경험삼아 작성해보는 코드이다.
  • 머신러닝(ML)은 Scikit-Learn을 중점적으로 공부해야 한다.
  • 딥러닝(DL)은 Tensorflow, Pytorch에 포커스를 맞춰야 한다.

사전준비

  • git bash로 VSCord에 들어가 터밀널을 연다.

바탕화면 우클릭 : git bash here

cd pyspk_project

code .

→ git bash 터미널

pyspark_ml 실습(1)

  • 가상환경 진입하고 폴더, 파일 생성

source venv/Scripts/activate

→ 폴더 생성 : chapter03_ml

cd chapter03_ml

  • 슬랙에서 data.zip 을 다운로드
  • 압축을 풀고 chapter03_ml 폴더에 복사하여 옮긴다.

→ 파일 생성 : step01_regression.py

→ 코드 작성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler

# 세션 할당
spark = SparkSession.builder.appName("DecisionTree").getOrCreate()

# 데이터 불러오기
# StructType 이 과정 생략
data = spark.read.option("header", "true").option("inferSchema", "true").csv("data/realestate.csv")

# 데이터 프레임을 행렬로 변환
assembler = VectorAssembler().setInputCols(['HouseAge', 'DistanceToMRT', 'NumberConvenienceStores']).setOutputCol("features")

# 타겟데이터 설정
df = assembler.transform(data).select("PriceofUnitArea", "features")

# 데이터 분리
trainTest = df.randomSplit([0.5, 0.5])
trainingDF = trainTest[0]
testDF = trainTest[1]

# Decision Tree 클래스 정의
dtr = DecisionTreeRegressor().setFeaturesCol("features").setLabelCol("PriceofUnitArea")

# 모델 학습
model = dtr.fit(trainingDF)
print(model)

# 모델 예측
fullPredictions = model.transform(testDF).cache()

# 예측값과 Label을 분리
predictions = fullPredictions.select("prediction").rdd.map(lambda x: x[0])

# 실제데이터
labels = fullPredictions.select("PriceofUnitArea").rdd.map(lambda x: x[0])

# zip
preds_label = predictions.zip(labels).collect()

for prediction in preds_label:
print(prediction)

# print(data.show())

# 세션 종료
spark.stop()

→ 저장 후 실행

python step01_regression.py

→ 다음과 같이 출력된다.

Untitled

pyspark_ml 실습(2)

→ 파일 생성 : step02_logistic_regression.py

→ 코드 작성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 세션 할당
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression # 기억

spark = SparkSession.builder.appName("AppName").getOrCreate()

# 데이터 불러오기
training = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
print("hello")

# 모델 만들기
# Scikit-Learn 문법과 비슷
mlr = LogisticRegression() # 기억
mlr_model = mlr.fit(training) # 기억

# 로지스틱 회귀, 선형 모델.. 기울기와 상수
print("Coefficients: " + str(mlr_model.coefficients))
print("Intercept: " + str(mlr_model.intercept))

spark.stop()

→ 저장 후 실행

python step02_logistic_regression.py

pyspark_ml 실습(3)

  • pyspark_pipeline

→ 파일 생성 : step03_pipeline.py

→ 코드 작성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
from tokenize import Token
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer

from pyspark.sql import SparkSession

# 세션 할당
spark = SparkSession.builder.appName("MLPipeline").getOrCreate()

# 가상의 데이터 만들기
training = spark.createDataFrame([
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])

# Feature Engineering
# 요리 작업

# 요리준비 1단계 : 텍스트를 단어로 분리
tokenizer = Tokenizer(inputCol='text', outputCol='words')

# 요리준비 2단계 : 변환된 텍스트를 숫자로 변환
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")

# 요리준비 3단계 : 모델을 가져옴
lr = LogisticRegression(maxIter=5, regParam=0.01)

# 요리 시작
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

# 메인재료 투하
model = pipeline.fit(training)

# Prepare test documents, which are unlabeled (id, text) tuples.
test = spark.createDataFrame([
(4, "spark i j k"),
(5, "l m n"),
(6, "spark hadoop spark"),
(7, "apache hadoop")
], ["id", "text"])

# 예측
prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
row_id, text, prob, prediction = row # 튜플
print(
# 문자열 포맷팅
"(%d, %s) -------> probability=%s, prediction=%f" % (row_id, text, str(prob), prediction)
)

# training.show()

# 세션 종료
spark.stop()

→ 저장 후 실행

python step03_pipeline.py

Untitled

pyspark_ml 실습(3)

→ 파일 생성 : step03_randomforest.py

→ 코드 작성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
from cProfile import label
from pyspark.sql import SparkSession

# 머신러닝 라이브러리
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# 데이터 불러오기
spark = SparkSession.builder.appName("RandomForest").getOrCreate()

data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
print(type(data))

# Feature Engineering
# label column
labelIndexer = StringIndexer(inputCol='label', outputCol='indexedLabel').fit(data)

# 범주형 데이터 체크, 인덱스화
featureIndexer = VectorIndexer(inputCol='features',
outputCol='IndexedFeatures', maxCategories=4).fit(data)

# 데이터 분리
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# 모델
rf = RandomForestClassifier(labelCol='indexedLabel', # 종속변수
featuresCol='IndexedFeatures', # 독립변수
numTrees=10)

# outputCol='indexedLabel' --> original label로 변환
labelConvereter = IndexToString(inputCol='prediction',
outputCol='predictedLabel', labels=labelIndexer.labels)

# 파이프라인 구축
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConvereter])

# 모델 학습
model = pipeline.fit(trainingData)

# 모델 예측
predictions = model.transform(testData)

# 행에 표시할 것 추출
predictions.select("predictedLabel", 'label', 'features').show(5)

# 모형 평가
evaluator = MulticlassClassificationEvaluator(
labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy"
)

accuracy = evaluator.evaluate(predictions)
print("Test Error = %f " % (1.0 - accuracy))

spark.stop()

→ 저장 후 실행

python step04_randomforest.py

Untitled

venv 생성되어 있는 경로로 이동

→ pip install jupyterlab

→ jupyter lab

→ 주피터랩에서 블로그에 올릴 자료 작성 가능.

pyspark 실습03

사전준비

  • git bash로 VSCord에 들어가 터밀널을 연다.

바탕화면 우클릭 : git bash here

cd pyspk_project

code .

→ git bash 터미널

pyspark 실습(1)

  • 가상환경 진입하고 폴더, 파일 생성

source venv/Scripts/activate

→ 폴더 생성 : chapter02_get_cleansing

  • 슬랙에서 data.zip 을 다운로드
  • 압축을 풀고 chapter02_get_cleansing 파일에 복사하여 옮긴다.

→ 파일 생성 : pipeline.py

Untitled

  • 코드를 작성해본다.

→ 코드 작성

from pyspark.sql import SparkSession

from pyspark.sql.functions import *

print("Hello!")

→ 저장

→ 경로 이동 : cd chapter02_get_cleansing

→ 실행 : python pipeline.py

  • 이어서 코드작성

→ pipeline.py를 다음과 같이 작성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import functions as F

# print("Hello!!")

# 스파크 세션을 생성
spark = SparkSession.builder.master("local[1]").\
appName("quickpipeline").getOrCreate()

# 데이터 불러오기
df = spark.read.csv("data\AA_DFW_2015_Departures_Short.csv.gz"
, header = True)

print("file loaded")

print(df.show())

# remove duration = 0
df = df.filter(df[3] > 0)

# ADD ID column
df = df.withColumn('id', F.monotonically_increasing_id())
df.show()

df.write.csv("data/output.csv", mode = "overwrite")

spark.stop()

→ 저장 후 실행

python pipeline.py

→ output.csv 가 생성되면 성공이다.

Untitled

pyspark 실습(2)

  • 온도를 측정하는 코드를 작성해본다.
  • 슬랙에서 다운로드
    • 1800.csv, book.txt, customer-orders.csv, fakefriends.csv
  • chapter02_get_cleansing/data 파일에 복사하여 옮긴다.

파일 생성 : min_temp.py

→ 코드 작성

from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster('local').setAppName('MinTemperatures')

sc = SparkContext(conf = conf)

print("Hello")

→ 저장 후 실행

python min_temp.py

  • 이어서 코드작성

→ min_temp.py를 다음과 같이 작성

from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster(‘local’).setAppName(‘MinTemperatures’)

sc = SparkContext(conf = conf)

print(“Begins…”)

def parseLine(line):

fileds = line.split(‘,’) # 문자열을 split

stationID = fileds[0]

entryType = fileds[2]

temperature = float(fileds[3]) * 0.1 * (9.0 / 5.0) + 32.0

return (stationID, entryType, temperature)

lines = sc.textFile(‘data/1800.csv’)

#print(lines)

parseLines = lines.map(parseLine)

#print(parseLine)

minTemps = parseLine.filter(lambda x : “TMIN” in x[1])

stationTemps = minTemps.map(lambda x: (x[0], x[2]))

minTemps = stationTemps.map(lambda x, y: min(x,y))

results = minTemps.collect()

print(results)

→ 저장 후 실행

python min_temp.py

pyspark 실습(3)

  • 나이를 출력하는 코드를 작성해보자

파일 생성 : friends-by-age.py

→ 코드 작성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("FriendsByAge")
sc = SparkContext(conf = conf)

def parseLine(line):
fields = line.split(',')
age = int(fields[2])
numFriends = int(fields[3])
return (age, numFriends)

lines = sc.textFile("logs/fakefriends.csv")
rdd = lines.map(parseLine)
totalsByAge = rdd.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
averagesByAge = totalsByAge.mapValues(lambda x: x[0] / x[1])
results = averagesByAge.collect()
for result in results:
print(result)

→ 저장 후 실행

python friends-by-age.py

pyspark 실습(4)

파일 생성 : totalspent.py

→ 코드 작성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 라이브러리 불러오기
from pyspark import SparkConf, SparkContext

# 사용자 정의 함수
def extractCusPrice(line):
fields = line.split(",")
return (int(fields[0]), float(fields[2]))

# main 함수
def main():

# 스파크 설정
conf = SparkConf().setMaster("local").setAppName('SpentbyCustomer')
sc = SparkContext(conf = conf)

# 데이터 불러오기
input = sc.textFile("data/customer-orders.csv")
# print("is data?")
mappedInput = input.map(extractCusPrice)
totalByCustomer = mappedInput.reduceByKey(lambda x, y : x + y)
# 정렬
filpped = totalByCustomer.map(lambda x: (x[1], x[0]))
totalByCustomerStored = filpped.sortByKey()

results = totalByCustomer.collect()
for result in results:
print(result)

# 실행 코드
if __name__ == "__main__":
main()

→ 저장 후 실행

python totalspent.py

pyspark 실습02

사전준비

  • git bash로 VSCord에 들어가 터미널을 연다.

바탕화면 우클릭 : git bash here

cd pyspk_project

code .

→ git bash 터미널

pyspark 실습(1)

  • 가상환경 진입하고 파일 생성

source venv/Scripts/activate

→ chapter01_get_starged 폴더에서 파일 생성

→ 파일 생성 : step04_structype.py

키워드 : Struct Type

구글링 : Spark Struct Type, Spark Struct

참고 링크 :

https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.types.StructType.html

→ 코드 작성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from struct import Struct
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, IntegerType, LongType

# 세션 할당 (필수)
# spark = SparkSession.builder.appName("")
spark = SparkSession.builder.appName("PopularMovies").getOrCreate()

# 스키마 작성 (u.logs 데이터)
schema = StructType(
[
StructField("userID", IntegerType(), True),
StructField("movieID", IntegerType(), True),
StructField("rating", IntegerType(), True),
StructField("timestamp", LongType(), True)
]
)

print("Schema is done")

# 데이터 불러오기
movies_df = spark.read.option("sep", "\t").schema(schema).csv("ml-100k/u.logs")

# 내림차순으로 인기있는 영화 정렬
# movieID 그룹바이. count() orderby
toMovieIds = movies_df.groupBy("movieID").count().orderBy(func.desc('count'))

print(movies_df.show(10))

# 세션 종료
spark.stop()

→ 경로 이동 : cd chapter01_get_started

→ 저장 후 실행

python step04_structype.py

→ 다음 테이블이 출력되어야 한다.

Untitled

pyspark 실습(2)

→ chapter01_get_starged 폴더에서 파일 생성

→ 파일 생성 : step05_advancestructype.py

→ 코드 작성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, IntegerType, LongType
import codecs

print("Hello")

def loadMovieNames():
movieNames = {}
with codecs.open("ml-100k/u.ITEM", "r", encoding="ISO-8859-1", errors="ignore") as f:
for line in f:
fields = line.split("|")
movieNames[int(fields[0])] = fields[1]
return movieNames

# 세션 할당
spark = SparkSession.builder.appName("PopularMovies").getOrCreate()

# 파이썬 딕셔너리 객체를 Spark 객체로 변환
nameDict = spark.sparkContext.broadcast(loadMovieNames())

# 스키마 작성 (u.logs 데이터)
schema = StructType(
[
StructField("userID", IntegerType(), True)
, StructField("movieID", IntegerType(), True)
, StructField("rating", IntegerType(), True)
, StructField("timestamp", LongType(), True)
]
)

print("Schema is done")

# 데이터 불러오기
movies_df = spark.read.option("sep", "\t").schema(schema).csv("ml-100k/u.logs")

# 내림차순으로 인기있는 영화 정렬
# movieID 그룹바이. count() orderby
topMovieIds = movies_df.groupBy("movieID").count()

# 딕셔너리
# key-value
# 키 값을 알면 value 자동으로 가져옴 (movieTitle)
def lookupName(movieID):
return nameDict.value[movieID]

lookupNameUDF = func.udf(lookupName)

# MovieTitle 기존 topMovieIds 데이터에 추가
# 컬럼을 추가
moviesWithNames = topMovieIds.withColumn("movieTitle", lookupNameUDF(func.col("movieID")))

final_df = moviesWithNames.orderBy(func.desc("count"))

print(final_df.show(10))

# 세션 종료
spark.stop()

→ 저장 후 실행

python step05_advancestructype.py

→ 다음과 같이 출력된다.

Untitled

pyspark 실습01

사전준비

  • spark on windows 참고하여 세팅
  • 스파크를 설치한다.
  • 만약, 파이썬이 처음이라면 **Anaconda**를 설치한다.

pyspark 설치

  • git bash를 이용해 폴더를 생성하고 터미널을 연다.

바탕화면 우클릭 : git bash here

mkdir pyspk_project

cd pyspk_project

code .

→ git bash 터미널

Untitled

  • 가상환경 생성 후 pyspark 설치

virtualenv venv

source venv/Scripts/activate

pip install pyspark

Untitled

pyspark 실습_1

  • 폴더 파일 생성

→ 폴더 생성 : chapter01_get_started

→ 파일 생성 : step01_basic.py

→ 코드 작성

import pyspark

print(pyspark.__version__)

→ 저장

→ 경로 이동 : cd chapter01_get_started

→ 실행 : python step01_basic.py

Untitled

  • 이어서 코드작성

→ step01_basic.py를 다음과 같이 작성

1
2
3
4
5
6
7
8
9
10
11
12
# -*- coding: utf-8 -*-

import pyspark
print(pyspark.__version__)

from pyspark.sql import SparkSession

# 스파크 세션 초기화
spark = SparkSession.builder.master('local[1]').appName('SampleTutorial').getOrCreate()
rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])

print("rdd Count:", rdd.count())

→ 저장 후 실행

→ 주소창에 입력 : http://localhost:4040/

→ 다음 화면이 출력된다.

  • 교재 278p

Untitled

pyspark 실습_2

  • 슬랙에서 dataset.zip 을 다운로드
  • 압축을 풀고 chapter01_get_started 파일에 복사하여 옮긴다.

Untitled

  • VSCord에서 작업

→ 파일 생성 : step02_ratings.py

→ 코드 작성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# SparkContext
# RDD

from pyspark import SparkConf, SparkContext
import collections

print("Hello")

def main():
# MasterNode = local
# MapReduce

conf = SparkConf().setMaster('local').setAppName('RatingsHistogram')
sc = SparkContext(conf = conf)

lines = sc.textFile("ml-100k/u.logs")
ratings = lines.map(lambda x: x.split()[2])
print("ratings: ", ratings)

result = ratings.countByValue()
print("result:", result)

sortedResults = collections.OrderedDict(sorted(result.items()))
for key, value in sortedResults.items():
print("%s %i" % (key, value))

if __name__ == "__main__":
main()

→ 저장

→ 실행 : python step02_ratings.py

→ 다음 결과가 출력된다.

Untitled

  • VSCord에서 작업

→ 파일 생성 : step03_dataloading.py

→ 코드 작성

→ pip install pandas

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# Spark SQL 적용

# Spark Session
from pyspark.sql import SparkSession
import pandas as pd

# 스파크 세션 생성
"""
my_spark = SparkSession.builder.getOrCreate()
print(my_spark)

# 테이블을 확인하는 코드
print(my_spark.catalog.listDatabases())

# show database
my_spark.sql('show databases').show()

# 현재 DB 확인
my_spark.catalog.currentDatabase()
my_spark.stop()
"""

# CSV 파일 불러오기
spark = SparkSession.builder.master('local[1]').appName('DBTutorial').getOrCreate()
flights = spark.read.option('header', 'true').csv('data/flight_small.csv')
# flights.show(4)

# spark.catalog.currentDatabase()
# flights 테이블을 default DB에 추가함
flights.createOrReplaceTempView('flights')

# print(spark.catalog.listTables('default'))
# spark.sql('show tables from default').show()

# 쿼리 통해서 데이터 저장
query = "FROM Fligths SELECT * LIMIT 10"
query2 = "SELECT * FROM flights 10"

# 스파크에 세션할당
flights10 = spark.sql(query2)
flights10.show()

# Spark 데이터 프레임을 Pandas 데이터 프레임을 변환
pd_flights10 = flights10.toPandas()
print(pd_flights10.head())

→ 저장

→ 실행 : python step03_dataloading.py