
import pyspark
from pyspark.sql.session import SparkSession #session 연결
# Session 생성
spark = SparkSession.builder.appName('Basics').getOrCreate()
df = spark.read.options(delimiter='|').csv('20240424 전력가스 대용량 (반기별)/전력가스 DB_202001~202006.csv',header=True)
#df = spark.read.option("header",'True').options(delimiter='|').csv('20240424 전력가스 대용량 (반기별)/전력가스 DB_202001~202006.csv')
df.show()
df.select("BASE_YM").show()
df.printSchema()
show() : 미리보기 메소드
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Window
#sparksession 드라이버 프로세스 얻기
spark = SparkSession.builder.appName("sample").master("local[*]").getOrCreate()
df = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("df_melt.csv")
df.printSchema()
df.show(5, False)
df.createOrReplaceTempView("dfTable")
path = '/.../....csv
spark_df = spark.read.csv(path,header=True, inferSchema=True)
spark_df.limit(num=5).show()
spark_df.head() #row 객체 1개
row = spark_df.head()
print('row:',row)
print()
row2 = spark_df.head(1)
print('row2:',row2)
print()
row3 = spark_df.head(3)
print('row3:',row3)
print()
num_cols = [name for name, dtype in spark_df.dtypes if dtype!= 'string']
spark_df.select(*num_cols).describe().show()
print("행 개수: ", spark_df.count())
print("열 개수: ", len(spark_df.columns))
#upper_c컬럼 추가
df.withColumn('upper_c', uppper(df.c)).show()
#
df.filter(df.a == 1).show()
df.gorupby('color').avg().show()
def plus_mean(pandas_df):
return pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean())
df.gorupby('color').applyInPandas(plus_mean, schema=df.schema).show()
df1 = spark.creatDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0), ('time', 'id', 'v1'))
df2 = spark.createDataFrame(
[(20000101, 1, 'x'), (20000101, 2, 'y')],
('time', 'id', 'v2'))
def asof_join(l, r):
return pd.merge_asof(l, r, on='time', by='id')
# This is similar to a left-join except that we match on nearest key
df1.gorupby('id').cogroup(df2.gorupby('id')).applyInPandas( asof_join, schema='time int, id int, v1 double, v2 string').show()
df.write.csv('foo.csv', header=True)
spark.read.sv('foo.csv', header=True).show()
df.write.parquet('bar.parquet')
df.read.parguet('bar.parquet').show()
df.write.orc('zoo.ort')
spark.read.orc('zoo.orc').show()