Spark ML

  • Spark로 머신러닝을 사용해 본다.
  • 실용성과 별개로 경험삼아 작성해보는 코드이다.
  • 머신러닝(ML)은 Scikit-Learn을 중점적으로 공부해야 한다.
  • 딥러닝(DL)은 Tensorflow, Pytorch에 포커스를 맞춰야 한다.

사전준비

  • git bash로 VSCord에 들어가 터밀널을 연다.

바탕화면 우클릭 : git bash here

cd pyspk_project

code .

→ git bash 터미널

pyspark_ml 실습(1)

  • 가상환경 진입하고 폴더, 파일 생성

source venv/Scripts/activate

→ 폴더 생성 : chapter03_ml

cd chapter03_ml

  • 슬랙에서 data.zip 을 다운로드
  • 압축을 풀고 chapter03_ml 폴더에 복사하여 옮긴다.

→ 파일 생성 : step01_regression.py

→ 코드 작성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler

# 세션 할당
spark = SparkSession.builder.appName("DecisionTree").getOrCreate()

# 데이터 불러오기
# StructType 이 과정 생략
data = spark.read.option("header", "true").option("inferSchema", "true").csv("data/realestate.csv")

# 데이터 프레임을 행렬로 변환
assembler = VectorAssembler().setInputCols(['HouseAge', 'DistanceToMRT', 'NumberConvenienceStores']).setOutputCol("features")

# 타겟데이터 설정
df = assembler.transform(data).select("PriceofUnitArea", "features")

# 데이터 분리
trainTest = df.randomSplit([0.5, 0.5])
trainingDF = trainTest[0]
testDF = trainTest[1]

# Decision Tree 클래스 정의
dtr = DecisionTreeRegressor().setFeaturesCol("features").setLabelCol("PriceofUnitArea")

# 모델 학습
model = dtr.fit(trainingDF)
print(model)

# 모델 예측
fullPredictions = model.transform(testDF).cache()

# 예측값과 Label을 분리
predictions = fullPredictions.select("prediction").rdd.map(lambda x: x[0])

# 실제데이터
labels = fullPredictions.select("PriceofUnitArea").rdd.map(lambda x: x[0])

# zip
preds_label = predictions.zip(labels).collect()

for prediction in preds_label:
print(prediction)

# print(data.show())

# 세션 종료
spark.stop()

→ 저장 후 실행

python step01_regression.py

→ 다음과 같이 출력된다.

Untitled

pyspark_ml 실습(2)

→ 파일 생성 : step02_logistic_regression.py

→ 코드 작성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 세션 할당
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression # 기억

spark = SparkSession.builder.appName("AppName").getOrCreate()

# 데이터 불러오기
training = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
print("hello")

# 모델 만들기
# Scikit-Learn 문법과 비슷
mlr = LogisticRegression() # 기억
mlr_model = mlr.fit(training) # 기억

# 로지스틱 회귀, 선형 모델.. 기울기와 상수
print("Coefficients: " + str(mlr_model.coefficients))
print("Intercept: " + str(mlr_model.intercept))

spark.stop()

→ 저장 후 실행

python step02_logistic_regression.py

pyspark_ml 실습(3)

  • pyspark_pipeline

→ 파일 생성 : step03_pipeline.py

→ 코드 작성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
from tokenize import Token
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer

from pyspark.sql import SparkSession

# 세션 할당
spark = SparkSession.builder.appName("MLPipeline").getOrCreate()

# 가상의 데이터 만들기
training = spark.createDataFrame([
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])

# Feature Engineering
# 요리 작업

# 요리준비 1단계 : 텍스트를 단어로 분리
tokenizer = Tokenizer(inputCol='text', outputCol='words')

# 요리준비 2단계 : 변환된 텍스트를 숫자로 변환
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")

# 요리준비 3단계 : 모델을 가져옴
lr = LogisticRegression(maxIter=5, regParam=0.01)

# 요리 시작
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

# 메인재료 투하
model = pipeline.fit(training)

# Prepare test documents, which are unlabeled (id, text) tuples.
test = spark.createDataFrame([
(4, "spark i j k"),
(5, "l m n"),
(6, "spark hadoop spark"),
(7, "apache hadoop")
], ["id", "text"])

# 예측
prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
row_id, text, prob, prediction = row # 튜플
print(
# 문자열 포맷팅
"(%d, %s) -------> probability=%s, prediction=%f" % (row_id, text, str(prob), prediction)
)

# training.show()

# 세션 종료
spark.stop()

→ 저장 후 실행

python step03_pipeline.py

Untitled

pyspark_ml 실습(3)

→ 파일 생성 : step03_randomforest.py

→ 코드 작성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
from cProfile import label
from pyspark.sql import SparkSession

# 머신러닝 라이브러리
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# 데이터 불러오기
spark = SparkSession.builder.appName("RandomForest").getOrCreate()

data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
print(type(data))

# Feature Engineering
# label column
labelIndexer = StringIndexer(inputCol='label', outputCol='indexedLabel').fit(data)

# 범주형 데이터 체크, 인덱스화
featureIndexer = VectorIndexer(inputCol='features',
outputCol='IndexedFeatures', maxCategories=4).fit(data)

# 데이터 분리
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# 모델
rf = RandomForestClassifier(labelCol='indexedLabel', # 종속변수
featuresCol='IndexedFeatures', # 독립변수
numTrees=10)

# outputCol='indexedLabel' --> original label로 변환
labelConvereter = IndexToString(inputCol='prediction',
outputCol='predictedLabel', labels=labelIndexer.labels)

# 파이프라인 구축
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConvereter])

# 모델 학습
model = pipeline.fit(trainingData)

# 모델 예측
predictions = model.transform(testData)

# 행에 표시할 것 추출
predictions.select("predictedLabel", 'label', 'features').show(5)

# 모형 평가
evaluator = MulticlassClassificationEvaluator(
labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy"
)

accuracy = evaluator.evaluate(predictions)
print("Test Error = %f " % (1.0 - accuracy))

spark.stop()

→ 저장 후 실행

python step04_randomforest.py

Untitled

venv 생성되어 있는 경로로 이동

→ pip install jupyterlab

→ jupyter lab

→ 주피터랩에서 블로그에 올릴 자료 작성 가능.

Spark UI

  • 가상환경을 생성한다.

/mnt/c 경로에서 실행

mkdir temp

cd temp

virtualenv venv

Untitled

  • 가상환경에서 pyspark를 설치한다.

source venv/bin/activate

pip install pyspark

  • 다음 링크 접속

Quick Start - Spark 3.2.1 Documentation (apache.org)

  • 다음 내용을 복사한다.

Untitled

1
2
3
This program just counts the number of lines containing 'a' and the number containing 'b' in a text file. Note that you'll need to replace YOUR_SPARK_HOME with the location where Spark is installed. As with the Scala and Java examples, we use a SparkSession to create Datasets. For applications that use custom classes or third-party libraries, we can also add code dependencies to spark-submit through its --py-files argument by packaging them into a .zip file (see spark-submit --help for details). SimpleApp is simple enough that we do not need to specify any code dependencies.

We can run this application using the bin/spark-submit script:

mkdir data

cd data

ls

vi README.md

→ 위에서 복사한 내용을 붙여넣는다.

:wq

→ 내용 확인cat README.md

cd ..

vi SimpleApp.py

→ 코드 작성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from pyspark.sql import SparkSession

logFile = "data/README.md" # Should be some file on your system
spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
logData = spark.read.text(logFile).cache()

numAs = logData.filter(logData.value.contains('a')).count()
numBs = logData.filter(logData.value.contains('b')).count()

print("Lines with a: %i, lines with b: %i" % (numAs, numBs))

input("Typing....")

spark.stop()

→ 저장 후 실행

→ python3 SimpleApp.py

→ 경로 확인 : echo $SPARK_HOME

$SPARK_HOME/bin/spark-submit --master local[4] SimpleApp.py

Untitled

  • 코드 샐행 후
  • 위 결과 참고하여 address 복사

→ 뒤에 :4041을 추가하여 주소창에 입력한다.

(코드 실행 후 나오는 텍스트에서 SparkUI를 확인하자)

→ 주소창에 입력하여 접속 : http://172.19.91.118:4041

→ 다음 화면 출력 시 성공.

Untitled

Spark on Linux

WSL2에서의 Spark 설치 - Data Science | DSChloe

개요

  • 간단하게 PySpark를 설치해보는 과정을 작성한다.
  • WSL2 설치 방법은 다루지 않는다.

필수 파일 설치

  • 설치가 안 되었을 경우에 설치한다.
  • 자바 및 Spark 파일을 설치하도록 한다.
1
2
3
$ sudo apt-get install openjdk-8-jdk
$ sudo wget https://archive.apache.org/dist/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz
$ sudo tar -xvzf spark-3.2.0-bin-hadoop3.2.tgz

.bashrc 파일 수정

  • 경로를 다음과 같이 설정한다.
1
2
evan@evan:/mnt/c/hadoop$ pwd
/mnt/c/hadoop
  • 설치한 파일은 다음과 같다.
1
2
evan@evan:/mnt/c/hadoop$ ls
spark-3.2.0-bin-hadoop3.2 spark-3.2.0-bin-hadoop3.2.tgz
  • vi ~/.bashrc 파일을 열고 다음과 같이 코드를 작성한다.
    • 다른 코드는 건드리지 않는다.
    • 마지막 라인에서 작성한다.
1
2
3
4
5
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
export SPARK_HOME=/mnt/c/spark
export PATH=$JAVA_HOME/bin:$PATH
export PATH=$SPARK_HOME/bin:$PATH
export PYSPARK_PYTHON=/usr/bin/python3

Untitled

테스트

  • pyspark를 실행한다. (경로에 주의한다)
  • SPARK_HOME을 다음과 같이 설정했으니 해당 경로에서 실행.
  • export SPARK_HOME=/mnt/c/spark

경로 이동 : cd ..

cd spark/

source ~/.bashrc

pyspark

Untitled

  • 정상적으로 작동한지 테스트한다.
  • 해당 경로에 README.md 파일이 있다면 시행해보자.

rd = sc.textFile("README.md")

rd.count()

→ 다음과 같이 출력된다면 성공이다.

Untitled

pyspark 실습03

사전준비

  • git bash로 VSCord에 들어가 터밀널을 연다.

바탕화면 우클릭 : git bash here

cd pyspk_project

code .

→ git bash 터미널

pyspark 실습(1)

  • 가상환경 진입하고 폴더, 파일 생성

source venv/Scripts/activate

→ 폴더 생성 : chapter02_get_cleansing

  • 슬랙에서 data.zip 을 다운로드
  • 압축을 풀고 chapter02_get_cleansing 파일에 복사하여 옮긴다.

→ 파일 생성 : pipeline.py

Untitled

  • 코드를 작성해본다.

→ 코드 작성

from pyspark.sql import SparkSession

from pyspark.sql.functions import *

print("Hello!")

→ 저장

→ 경로 이동 : cd chapter02_get_cleansing

→ 실행 : python pipeline.py

  • 이어서 코드작성

→ pipeline.py를 다음과 같이 작성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import functions as F

# print("Hello!!")

# 스파크 세션을 생성
spark = SparkSession.builder.master("local[1]").\
appName("quickpipeline").getOrCreate()

# 데이터 불러오기
df = spark.read.csv("data\AA_DFW_2015_Departures_Short.csv.gz"
, header = True)

print("file loaded")

print(df.show())

# remove duration = 0
df = df.filter(df[3] > 0)

# ADD ID column
df = df.withColumn('id', F.monotonically_increasing_id())
df.show()

df.write.csv("data/output.csv", mode = "overwrite")

spark.stop()

→ 저장 후 실행

python pipeline.py

→ output.csv 가 생성되면 성공이다.

Untitled

pyspark 실습(2)

  • 온도를 측정하는 코드를 작성해본다.
  • 슬랙에서 다운로드
    • 1800.csv, book.txt, customer-orders.csv, fakefriends.csv
  • chapter02_get_cleansing/data 파일에 복사하여 옮긴다.

파일 생성 : min_temp.py

→ 코드 작성

from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster('local').setAppName('MinTemperatures')

sc = SparkContext(conf = conf)

print("Hello")

→ 저장 후 실행

python min_temp.py

  • 이어서 코드작성

→ min_temp.py를 다음과 같이 작성

from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster(‘local’).setAppName(‘MinTemperatures’)

sc = SparkContext(conf = conf)

print(“Begins…”)

def parseLine(line):

fileds = line.split(‘,’) # 문자열을 split

stationID = fileds[0]

entryType = fileds[2]

temperature = float(fileds[3]) * 0.1 * (9.0 / 5.0) + 32.0

return (stationID, entryType, temperature)

lines = sc.textFile(‘data/1800.csv’)

#print(lines)

parseLines = lines.map(parseLine)

#print(parseLine)

minTemps = parseLine.filter(lambda x : “TMIN” in x[1])

stationTemps = minTemps.map(lambda x: (x[0], x[2]))

minTemps = stationTemps.map(lambda x, y: min(x,y))

results = minTemps.collect()

print(results)

→ 저장 후 실행

python min_temp.py

pyspark 실습(3)

  • 나이를 출력하는 코드를 작성해보자

파일 생성 : friends-by-age.py

→ 코드 작성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("FriendsByAge")
sc = SparkContext(conf = conf)

def parseLine(line):
fields = line.split(',')
age = int(fields[2])
numFriends = int(fields[3])
return (age, numFriends)

lines = sc.textFile("logs/fakefriends.csv")
rdd = lines.map(parseLine)
totalsByAge = rdd.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
averagesByAge = totalsByAge.mapValues(lambda x: x[0] / x[1])
results = averagesByAge.collect()
for result in results:
print(result)

→ 저장 후 실행

python friends-by-age.py

pyspark 실습(4)

파일 생성 : totalspent.py

→ 코드 작성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 라이브러리 불러오기
from pyspark import SparkConf, SparkContext

# 사용자 정의 함수
def extractCusPrice(line):
fields = line.split(",")
return (int(fields[0]), float(fields[2]))

# main 함수
def main():

# 스파크 설정
conf = SparkConf().setMaster("local").setAppName('SpentbyCustomer')
sc = SparkContext(conf = conf)

# 데이터 불러오기
input = sc.textFile("data/customer-orders.csv")
# print("is data?")
mappedInput = input.map(extractCusPrice)
totalByCustomer = mappedInput.reduceByKey(lambda x, y : x + y)
# 정렬
filpped = totalByCustomer.map(lambda x: (x[1], x[0]))
totalByCustomerStored = filpped.sortByKey()

results = totalByCustomer.collect()
for result in results:
print(result)

# 실행 코드
if __name__ == "__main__":
main()

→ 저장 후 실행

python totalspent.py

pyspark 실습02

사전준비

  • git bash로 VSCord에 들어가 터미널을 연다.

바탕화면 우클릭 : git bash here

cd pyspk_project

code .

→ git bash 터미널

pyspark 실습(1)

  • 가상환경 진입하고 파일 생성

source venv/Scripts/activate

→ chapter01_get_starged 폴더에서 파일 생성

→ 파일 생성 : step04_structype.py

키워드 : Struct Type

구글링 : Spark Struct Type, Spark Struct

참고 링크 :

https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.types.StructType.html

→ 코드 작성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from struct import Struct
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, IntegerType, LongType

# 세션 할당 (필수)
# spark = SparkSession.builder.appName("")
spark = SparkSession.builder.appName("PopularMovies").getOrCreate()

# 스키마 작성 (u.logs 데이터)
schema = StructType(
[
StructField("userID", IntegerType(), True),
StructField("movieID", IntegerType(), True),
StructField("rating", IntegerType(), True),
StructField("timestamp", LongType(), True)
]
)

print("Schema is done")

# 데이터 불러오기
movies_df = spark.read.option("sep", "\t").schema(schema).csv("ml-100k/u.logs")

# 내림차순으로 인기있는 영화 정렬
# movieID 그룹바이. count() orderby
toMovieIds = movies_df.groupBy("movieID").count().orderBy(func.desc('count'))

print(movies_df.show(10))

# 세션 종료
spark.stop()

→ 경로 이동 : cd chapter01_get_started

→ 저장 후 실행

python step04_structype.py

→ 다음 테이블이 출력되어야 한다.

Untitled

pyspark 실습(2)

→ chapter01_get_starged 폴더에서 파일 생성

→ 파일 생성 : step05_advancestructype.py

→ 코드 작성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, IntegerType, LongType
import codecs

print("Hello")

def loadMovieNames():
movieNames = {}
with codecs.open("ml-100k/u.ITEM", "r", encoding="ISO-8859-1", errors="ignore") as f:
for line in f:
fields = line.split("|")
movieNames[int(fields[0])] = fields[1]
return movieNames

# 세션 할당
spark = SparkSession.builder.appName("PopularMovies").getOrCreate()

# 파이썬 딕셔너리 객체를 Spark 객체로 변환
nameDict = spark.sparkContext.broadcast(loadMovieNames())

# 스키마 작성 (u.logs 데이터)
schema = StructType(
[
StructField("userID", IntegerType(), True)
, StructField("movieID", IntegerType(), True)
, StructField("rating", IntegerType(), True)
, StructField("timestamp", LongType(), True)
]
)

print("Schema is done")

# 데이터 불러오기
movies_df = spark.read.option("sep", "\t").schema(schema).csv("ml-100k/u.logs")

# 내림차순으로 인기있는 영화 정렬
# movieID 그룹바이. count() orderby
topMovieIds = movies_df.groupBy("movieID").count()

# 딕셔너리
# key-value
# 키 값을 알면 value 자동으로 가져옴 (movieTitle)
def lookupName(movieID):
return nameDict.value[movieID]

lookupNameUDF = func.udf(lookupName)

# MovieTitle 기존 topMovieIds 데이터에 추가
# 컬럼을 추가
moviesWithNames = topMovieIds.withColumn("movieTitle", lookupNameUDF(func.col("movieID")))

final_df = moviesWithNames.orderBy(func.desc("count"))

print(final_df.show(10))

# 세션 종료
spark.stop()

→ 저장 후 실행

python step05_advancestructype.py

→ 다음과 같이 출력된다.

Untitled

pyspark 실습01

사전준비

  • spark on windows 참고하여 세팅
  • 스파크를 설치한다.
  • 만약, 파이썬이 처음이라면 **Anaconda**를 설치한다.

pyspark 설치

  • git bash를 이용해 폴더를 생성하고 터미널을 연다.

바탕화면 우클릭 : git bash here

mkdir pyspk_project

cd pyspk_project

code .

→ git bash 터미널

Untitled

  • 가상환경 생성 후 pyspark 설치

virtualenv venv

source venv/Scripts/activate

pip install pyspark

Untitled

pyspark 실습_1

  • 폴더 파일 생성

→ 폴더 생성 : chapter01_get_started

→ 파일 생성 : step01_basic.py

→ 코드 작성

import pyspark

print(pyspark.__version__)

→ 저장

→ 경로 이동 : cd chapter01_get_started

→ 실행 : python step01_basic.py

Untitled

  • 이어서 코드작성

→ step01_basic.py를 다음과 같이 작성

1
2
3
4
5
6
7
8
9
10
11
12
# -*- coding: utf-8 -*-

import pyspark
print(pyspark.__version__)

from pyspark.sql import SparkSession

# 스파크 세션 초기화
spark = SparkSession.builder.master('local[1]').appName('SampleTutorial').getOrCreate()
rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])

print("rdd Count:", rdd.count())

→ 저장 후 실행

→ 주소창에 입력 : http://localhost:4040/

→ 다음 화면이 출력된다.

  • 교재 278p

Untitled

pyspark 실습_2

  • 슬랙에서 dataset.zip 을 다운로드
  • 압축을 풀고 chapter01_get_started 파일에 복사하여 옮긴다.

Untitled

  • VSCord에서 작업

→ 파일 생성 : step02_ratings.py

→ 코드 작성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# SparkContext
# RDD

from pyspark import SparkConf, SparkContext
import collections

print("Hello")

def main():
# MasterNode = local
# MapReduce

conf = SparkConf().setMaster('local').setAppName('RatingsHistogram')
sc = SparkContext(conf = conf)

lines = sc.textFile("ml-100k/u.logs")
ratings = lines.map(lambda x: x.split()[2])
print("ratings: ", ratings)

result = ratings.countByValue()
print("result:", result)

sortedResults = collections.OrderedDict(sorted(result.items()))
for key, value in sortedResults.items():
print("%s %i" % (key, value))

if __name__ == "__main__":
main()

→ 저장

→ 실행 : python step02_ratings.py

→ 다음 결과가 출력된다.

Untitled

  • VSCord에서 작업

→ 파일 생성 : step03_dataloading.py

→ 코드 작성

→ pip install pandas

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# Spark SQL 적용

# Spark Session
from pyspark.sql import SparkSession
import pandas as pd

# 스파크 세션 생성
"""
my_spark = SparkSession.builder.getOrCreate()
print(my_spark)

# 테이블을 확인하는 코드
print(my_spark.catalog.listDatabases())

# show database
my_spark.sql('show databases').show()

# 현재 DB 확인
my_spark.catalog.currentDatabase()
my_spark.stop()
"""

# CSV 파일 불러오기
spark = SparkSession.builder.master('local[1]').appName('DBTutorial').getOrCreate()
flights = spark.read.option('header', 'true').csv('data/flight_small.csv')
# flights.show(4)

# spark.catalog.currentDatabase()
# flights 테이블을 default DB에 추가함
flights.createOrReplaceTempView('flights')

# print(spark.catalog.listTables('default'))
# spark.sql('show tables from default').show()

# 쿼리 통해서 데이터 저장
query = "FROM Fligths SELECT * LIMIT 10"
query2 = "SELECT * FROM flights 10"

# 스파크에 세션할당
flights10 = spark.sql(query2)
flights10.show()

# Spark 데이터 프레임을 Pandas 데이터 프레임을 변환
pd_flights10 = flights10.toPandas()
print(pd_flights10.head())

→ 저장

→ 실행 : python step03_dataloading.py

Spark on Windows

Spark Installation on Windows 10 - Data Science | DSChloe

사전준비

  • 스파크를 설치하는 과정이다.
  • 사전에 파이썬 3가 설치되어 있어야 한다.
  • 만약, 파이썬이 처음이라면 **Anaconda**를 설치한다.

자바 다운로드

Spark 다운로드

Untitled

WinRAR 다운로드

  • 이 때, .tgz 압축파일을 풀기 위해서는 WinRAR 을 설치한다.

Untitled

winutils 다운로드

  • 이번에는 스파크가 윈도우 로컬 컴퓨터가 Hadoop으로 착각하게 만들 프로그램이 필요하다.
  • 이전에 받은 spark-3.2.0-bin-hadoob-3.2.tgz 와 버전이 일치하는 것을 선택해야 한다.
    • 3.2.0 버전을 다운로드 받았다.

Untitled

자바 설치 진행

  • C 드라이브에 폴더 생성 : hadoob
  • 다운로드 받은 파일 4개를 C/hadoob 에 복사하여 옮긴다

Untitled

  • 관리자 권한으로 실행 : jdk-8u311-windows-x64

Untitled

  • 계속 Next 버튼 클릭 후, 아래 파일에서 경로를 수정한다. (이 때, Program Files 공백이 있는데, 이러한 공백은 환경 설치 시 문제가 될 수 있다.)
  • Development Tools 선택
  • change 버튼으로 변경을 진행한다.

Untitled

  • c 드라이브 경로로 이동
  • Foldername: jdk 입력
    • 다음 그림과 같아야 한다.

Untitled

  • Java를 다른 폴더에 설치하려 한다.
  • 변경(C)…

Untitled

  • c 드라이브 경로에서 ‘새 폴더 만들기(M)’
  • 폴더 생성 : jre

Untitled

  • 다음과 같이 설치 위치가 지정된다.

Untitled

  • 성공적으로 설치되었다.

Untitled

winrar 설치 진행

  • 관리자 권한으로 실행 : winrar-x64-611

Untitled

  • 기본 설정으로 설치 진행

Untitled

spark 설치 진행

  • Spark 설치를 진행한다.
  • 설치 파일 우클릭 → Extract to “spark-3.2.0-bin-hadoop3.2|”

Untitled

spark 폴더 생성 및 파일 이동

  • 위 과정 이후 폴더가 생성된다.
  • 파일 이동을 하도록 한다.
    • spark-3.2.0-bin-hadoop3.2 폴더 내 모든 파일을 복사한다.
  • 그 후, C드라이브 하단에 spark 폴더를 생성한 후, 모두 옮긴다.

Untitled

log4j.properties 파일 수정

  • C -> sparkconf → [log4j.properties](http://log4j.properties) 파일을 연다.
  • 해당 파일을 메모장으로 연 후, 아래에서 INFO → ERROR 로 변경한다.
    • 작업 실행 시, 출력하는 모든 logs 값들을 없앨 수 있다.
    • 다음과 같이 설정 후 저장

Untitled

winutils 설치 진행

  • C드라이브에서 winutils-bin 폴더를 차례로 생성한다.
  • 다운로드 받은 winutils 파일을 복사하여 옮긴다.

Untitled

  • 이 파일이 Spark 실행 시, 오류 없이 실행될 수 있도록 파일 사용 권한을 얻도록 한다.
    • 이 때에는 CMD 관리자 권한으로 파일을 열어서 실행한다.
  • 관리자 권한으로 실행 : 명령 프롬프트

Untitled

  • 다음 코드들을 시행

cd c:\winutils\bin

winutils.exe chmod 777 \tmp\hive

  • 만약, ChangeFileModeByMask error (3) 에러 발생 시,

    C드라이브 하단에, tmp\hive 폴더를 차례대로 생성을 한다.

Untitled

  • 실행 결과, 에러가 발생했으므로 C드라이브에 폴더를 생성한다.
  • 폴더 생성 : tmp
    • 폴더 생성 : hive
  • 다시 코드를 실행한다.

winutils.exe chmod 777 \tmp\hive

→ 오류없이 실행되었다.

Untitled

환경변수 설정

  • ‘시스템 환경 변수 편집’ 열기

환경 변수(N)..

Untitled

시스템 환경변수를 설정한다.

  • 각 사용자 계정에 사용자 변수 - 새로 만들기 버튼을 클릭한다.

Untitled

  • 다음과 같이 설정
  • SPARK_HOME 환경변수를 설정한다.

Untitled

  • JAVA_HOME 환경변수를 설정한다.

Untitled

  • HADOOP_HOME 환경변수를 설정한다.

Untitled

  • 환경변수를 편집한다.
  • Path 선택 → 편집(E)…

Untitled

  • 아래 코드를 추가한다.
  • 새로 만들기
    • %SPARK_HOME%\bin
    • %JAVA_HOME%\bin

Untitled

파이썬 환경설정

  • Python 환경설정을 추가한다.
  • PYSPARK_PYTHON 환경변수를 설정한다.

Untitled

  • PYSPARK_DRIVER_PYTHON 환경변수를 설정한다.
  • 일단 지운다.

Untitled

  • PYSPARK_DRIVER_PYTHON_OPTS 환경변수를 설정한다.
  • 일단 삭제한다.

Untitled

스파크 테스트

  • 명령 프롬프트에서 진행

→ c:\spark 폴더로 경로를 설정 한다.

pyspark

Untitled

  • 이번에는 [README.md](http://README.md) 파일을 불러와서 아래 코드가 실행되는지 확인한다.
  • 다음 코드를 실행해 본다.

rd = sc.textFile("README.md")

rd.count()

→ 다음 결과 출력 시 성공.

Untitled

pyspark 실습_1

pyspark_실습_2

pyspark_실습_3

Spark on linux

Spark UI

Spark ML