로컬로 Pyspark 설정, Pyspark로 첫 번째 ETL 파이프 라인을 구축하십시오.
Python 사용, Jupyter Notebook
Pyspark는 Apache Spark의 Python API입니다.
Apache Spark는 대규모 데이터 처리를위한 분석 엔진입니다.
분산 데이터 처리 엔진으로 클러스터에서 실행됩니다.
클러스터는 3 개 이상의 노드 (또는 컴퓨터)로 구성됩니다.
Spark는 Scala로 작성되었지만 Java, Python 및 R과 같은 다른 주류 언어에 대한 API를 제공합니다. Pyspark는 Python API입니다.
또한 SQL 용 Spark SQL, Pandas Workloads 용 Spark on Pandas API 및 증분 계산 및 스트림 처리를위한 구조적 스트리밍을 포함한 다른 도구 및 언어를 지원합니다.
Pyspark 아키텍처
Apache Spark에는 마스터를“드라이버”라고하며 노예는“노동자”라고합니다.
클러스터 기반 시스템이며 운전자와 여러 근로자로 구성됩니다.
클러스터의 마스터 노드에는 스파크 컨텍스트를 생성하는 드라이버 프로그램이 있습니다.
SparkContext 클러스터의 프로세스를 조정합니다.
작업 실행을 예약하고 클러스터 관리자와 연결합니다.
클러스터 관리자는 스파크 응용 프로그램에 리소스를 할당합니다.
일단 연결되면 Spark는 클러스터의 작업자 노드에서 집행자를 획득합니다.이 클러스터는 계산을 실행하고 응용 프로그램에 대한 데이터를 저장하는 프로세스입니다.
다음으로, 응용 프로그램 코드 (JAR 또는 Python 파일을 SparkContext로) 보냅니다.
마지막으로 SparkContext는 실행자에게 작업을 보냅니다.
독립형 모드에서 PySpark를 실행합니다.
독립형 클러스터는 Spark에 포함 된 간단한 클러스터 관리자로 클러스터를 쉽게 설정할 수 있습니다.
사전 반품
우리는 기계에 Java JDK를 설치해야합니다.
Windows에 있고 Windows 버전을 설치했습니다.
데이터베이스에 데이터를 쿼리하거나 지속하려면 적절한 데이터베이스에 대한 JAR 파일이 필요합니다.
PostgreSQL 및 SQL Server가 있으므로 Maven 저장소 사이트에서 Jars를 다운로드합니다.
JAR 파일은 Java 버전과 호환되어야합니다.
마지막으로, 우리는 컴퓨터에 Pyspark를 설치해야합니다.
첫 번째 Pyspark 앱
이제 우리는 첫 번째 Pyspark 응용 프로그램을 개발할 준비가되었습니다.
우리는 Jupyter Notebook을 IDE로 사용할 것입니다.
우리는 맨 위에 필요한 라이브러리를 가져옵니다.
PySpark 응용 프로그램을 실행하려면 Java를 설치해야합니다.
우리는 자바 위치를 스파크로 제공해야합니다.
이를 위해 OS Dot Environ과 함께 Java Home 변수를 설정하고 Java 설치 디렉토리를 제공합니다.
다음으로 Spark 응용 프로그램의 구성을 설정합니다.
스파크 응용 프로그램은 실행하려면 몇 가지 구성 세부 정보가 필요합니다.
PySpark에서 가져온 SparkConf 객체의 도움으로 구성을 제공 할 수 있습니다.
응용 프로그램 이름을 "예"로 설정했습니다.
마스터 노드 URL 로의 로컬 및 JAR 파일의 위치를 설정하십시오.
우리는 스파크 컨텍스트를 만들 준비가되었습니다.
SparkContext 객체를 사용하고 GetOrcreate 기능을 호출합니다.
이 기능을 위해 구성을 제공합니다.
우리는 Sparksession을 호출하여 Spark Context를 제공합니다.
세션을 Spark라는 변수로 저장합니다.
스파크 변수를보고 스파크 응용 프로그램의 세부 사항을 볼 수 있습니다.
위에서 제공 한 구성 세부 정보가 표시됩니다.
또한 Spark UI에 대한 링크가 있습니다.
링크를 클릭하고 Spark UI를 탐색합니다.
우리는 여기에 로그인 한 응용 프로그램에서 수행하는 모든 조치를 볼 수 있습니다.
몇 가지 데이터 프레임을 생성하면 SQL/DateFrame 탭을 다시 방문합니다.
우리의 스파크 응용 프로그램이 시작되었습니다.
Spark가있는 CSV 파일을 읽어 보겠습니다.
파일을 데이터 프레임으로 읽습니다.
쇼 기능을 사용하여 데이터 프레임을 표시합니다.
데이터 프레임 작업
이제이 데이터 프레임에서 다양한 작업을 수행 할 수 있습니다.
Spark 웹 사이트에서 전체 목록을 확인할 수 있습니다.
데이터 프레임의 스키마를 인쇄하여 열과 데이터 유형을 볼 수 있습니다.
이 데이터 프레임을 필터 기능으로 필터링 할 수 있습니다.
여기서 우리는 프랑스에 대해서만 데이터 세트를 필터링하고 있습니다.
분석에 필요한 데이터를 필터링 할 수 있습니다.
몇 개의 열에 집중하려면 Dataframe에 열 목록을 제공하여 데이터 프레임을 아래로 줄일 수 있습니다.
이 데이터 프레임에는 우리가 지정한 열의 하위 집합이 포함되어 있습니다.
Group By와 같은 집계 함수를 수행 할 수도 있습니다.
Spark Dataframe API를 사용한 데이터 조작의 몇 가지 예입니다.
Spark-SQL
Spark SQL을 사용 하여이 데이터 세트에 대해 SQL 쿼리를 실행할 수도 있습니다.
우리는 데이터 프레임을 판매라는 임시보기로 저장합니다.
Spark SQL을 사용하여 SQL로 쿼리 할 수 있습니다.
출력은 CAPS 하위 범주에 필터링 된 레코드를 표시합니다.
분산 엔진을 사용 하여이 데이터 세트에 대해 표준 SQL 작업을 수행 할 수 있습니다.
데이터베이스에 쓰십시오
원한다면이 데이터 세트를 데이터베이스에 유지할 수 있습니다.
PostgreSQL의 JAR 파일을 Spark 컨텍스트에 제공했습니다.
자격 증명과 함께 데이터베이스 세부 정보를 선언합니다.
pyspark_sales_table이라는 새 테이블을 추가합니다.
데이터 프레임에서 쓰기 기능을 호출합니다.
우리는 모드 (테이블이 존재하는 경우 테이블을 덮어 쓰는 것)와 같은 몇 가지 구성을 제공합니다. 형식은 JDBC입니다 (JDBC URL, 대상 테이블 이름, 사용자, 암호 및 드라이버를 제공합니다. 마지막으로, 우리는 저장 기능을 호출합니다.
이 셀을 실행하면 데이터 프레임을 PostgreSQL 데이터베이스에 저장합니다.
독립형 모드에서 PySpark를 설정하는 방법을 보여주었습니다.
CSV 파일의 데이터를 읽는 첫 번째 Pyspark 응용 프로그램을 작성했습니다.
Spark에서 Pandas API를 사용하여 DataFrame 작업을 수행했습니다.
또한 Spark-SQL을 사용하여 가져온 데이터 세트를 쿼리했습니다.
마지막으로, 데이터 프레임을 PostgreSQL 데이터베이스에 유지했습니다.
결론
- PySpark가 무엇인지, 로컬로 구성하는 방법을 설명합니다.
- PySpark를 사용하여 데이터를 가져오고 조작하는 것이 얼마나 쉬운 지 보여주었습니다.
- PySpark, Pandas API 및 PostgreSQL을 사용하여 기본 ETL 파이프 라인을 구현했습니다.
- 전체 코드는 여기에서 찾을 수 있습니다