This program just counts the number of lines containing 'a' and the number containing 'b' in a text file. Note that you'll need to replace YOUR_SPARK_HOME with the location where Spark is installed. As with the Scala and Java examples, we use a SparkSession to create Datasets. For applications that use custom classes or third-party libraries, we can also add code dependencies to spark-submit through its --py-files argument by packaging them into a .zip file (see spark-submit --help for details). SimpleApp is simple enough that we do not need to specify any code dependencies.
We can run this application using the bin/spark-submit script:
→mkdir data
→cd data
→ls
→vi README.md
→ 위에서 복사한 내용을 붙여넣는다.
→ :wq
→ 내용 확인cat README.md
→ cd ..
→ vi SimpleApp.py
→ 코드 작성
1 2 3 4 5 6 7 8 9 10 11 12 13 14
from pyspark.sql import SparkSession
logFile = "data/README.md" # Should be some file on your system spark = SparkSession.builder.appName("SimpleApp").getOrCreate() logData = spark.read.text(logFile).cache()
import pandas as pd from pandas.io.json import json_normalize from elasticsearch import Elasticsearch
# Elasticsearch 객체 생성 es = Elasticsearch()
# 일래스틱서치에 보낼 문서 본문(질의 요청) JSON 객체를 만든다. # Matchall 검색 사용 doc = {"query" : {"match_all": {}}} res = es.search(index="users", body = doc, size = 500) # print(res['hits']['hits'])
# 루프로 문서를 훑으면서 각 문서의 _source 필드만 출력한다. # for doc in res['hits']['hits']: # print(doc['_source'])
# 질의 결과를 pandas DataFrame에 넣는 것도 가능 df = json_normalize(res['hits']['hits']) print(df.head()) print(df.info())
import datetime as dt from datetime import timedelta from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator
import pandas as pd import psycopg2 as db from elasticsearch import Elasticsearch
# 데이터 불러오기 df = pd.read_csv("postgresqldata.csv") for i, r in df.iterrows(): doc = r.to_json() res = es.index( index="frompostgresql" , doc_type="doc", body=doc ) print(res)
from elasticsearch import Elasticsearch from elasticsearch import helpers from faker import Faker
fake=Faker() es = Elasticsearch() #or pi {127.0.0.1}
actions = [ { "_index": "users", "_type": "doc", "_source": { "name": fake.name(), "street": fake.street_address(), "city": fake.city(), "zip":fake.zipcode()} } for x in range(998) # or for i,r in df.iterrows() ]