Apache Spark: 대규모 데이터 처리를 위한 통합 분석 엔진
// build.sbt
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "3.5.0",
"org.apache.spark" %% "spark-sql" % "3.5.0",
"org.apache.spark" %% "spark-mllib" % "3.5.0"
)
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("Scala Learning Guide")
.master("local[*]") // 로컬 모드, 모든 코어 사용
.config("spark.driver.memory", "2g")
.getOrElse()
try {
// Spark 작업 수행
} finally {
spark.stop()
}
import spark.implicits._
// Case class 정의
case class Person(name: String, age: Int, city: String)
// 데이터 생성
val people = Seq(
Person("Alice", 25, "Seoul"),
Person("Bob", 30, "Busan"),
Person("Charlie", 35, "Seoul"),
Person("David", 28, "Incheon")
).toDF()
people.show()
/*
+-------+---+-------+
| name|age| city|
+-------+---+-------+
| Alice| 25| Seoul|
| Bob| 30| Busan|
|Charlie| 35| Seoul|
| David| 28|Incheon|
+-------+---+-------+
*/
// 필터링
people.filter($"age" > 28).show()
// 선택
people.select("name", "city").show()
// 그룹화
people.groupBy("city").count().show()
// 정렬
people.orderBy($"age".desc).show()
// 집계
import org.apache.spark.sql.functions._
people.agg(
avg("age").as("average_age"),
max("age").as("max_age"),
min("age").as("min_age")
).show()
// 임시 뷰 생성
people.createOrReplaceTempView("people")
// SQL 쿼리 실행
val result = spark.sql("""
SELECT city, COUNT(*) as count, AVG(age) as avg_age
FROM people
GROUP BY city
ORDER BY count DESC
""")
result.show()
목표: 웹 서버 액세스 로그 분석하여 인사이트 추출
// 로그 데이터 Case Class
case class AccessLog(
ip: String,
timestamp: java.sql.Timestamp,
method: String,
endpoint: String,
status: Int,
size: Long
)
// 로그 파싱 함수
import java.sql.Timestamp
import java.text.SimpleDateFormat
def parseLog(line: String): Option[AccessLog] = {
val pattern = """(\S+) - - \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+) \S+" (\d{3}) (\d+)""".r
line match {
case pattern(ip, timestamp, method, endpoint, status, size) =>
try {
val dateFormat = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss Z")
val ts = new Timestamp(dateFormat.parse(timestamp).getTime)
Some(AccessLog(ip, ts, method, endpoint, status.toInt, size.toLong))
} catch {
case _: Exception => None
}
case _ => None
}
}
// 로그 파일 로드
val logLines = spark.read.textFile("logs/access.log")
val accessLogs = logLines.flatMap(parseLog).toDF()
accessLogs.show(5)
// 1. 가장 많이 방문된 엔드포인트
accessLogs
.groupBy("endpoint")
.count()
.orderBy($"count".desc)
.limit(10)
.show()
// 2. HTTP 상태 코드 분포
accessLogs
.groupBy("status")
.count()
.orderBy("status")
.show()
// 3. 시간대별 트래픽
accessLogs
.withColumn("hour", hour($"timestamp"))
.groupBy("hour")
.count()
.orderBy("hour")
.show()
// 4. IP별 요청 수 (Top 10)
accessLogs
.groupBy("ip")
.agg(
count("*").as("requests"),
sum("size").as("total_bytes")
)
.orderBy($"requests".desc)
.limit(10)
.show()
// 5. 에러 로그 분석 (4xx, 5xx)
val errorLogs = accessLogs.filter($"status" >= 400)
errorLogs
.groupBy("endpoint", "status")
.count()
.orderBy($"count".desc)
.show()
// Parquet 형식으로 저장
accessLogs
.write
.mode("overwrite")
.partitionBy("status")
.parquet("output/access_logs")
// CSV 형식으로 저장
errorLogs
.write
.mode("overwrite")
.option("header", "true")
.csv("output/error_logs")
import org.apache.spark.sql.streaming._
// 스트리밍 소스 생성 (예: 소켓)
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
// 워드 카운트
val wordCounts = lines
.as[String]
.flatMap(_.split(" "))
.groupBy("value")
.count()
// 콘솔 출력
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
// 실시간 로그 스트림
val streamingLogs = spark.readStream
.format("text")
.load("logs/streaming/")
val parsedLogs = streamingLogs
.as[String]
.flatMap(parseLog)
.toDF()
// 윈도우 기반 집계 (5분 단위)
val windowedCounts = parsedLogs
.groupBy(
window($"timestamp", "5 minutes", "1 minute"),
$"status"
)
.count()
// 결과 출력
val query2 = windowedCounts.writeStream
.outputMode("update")
.format("console")
.option("truncate", "false")
.start()
query2.awaitTermination()
import org.apache.spark.ml.feature._
import org.apache.spark.ml.classification.LogisticRegression
// 예제 데이터: 이메일 스팸 분류
case class Email(text: String, label: Double)
val emails = Seq(
Email("Get rich quick! Click here!", 1.0),
Email("Meeting at 3pm today", 0.0),
Email("Win a free iPhone now!", 1.0),
Email("Project deadline reminder", 0.0),
Email("Congratulations! You won $1000", 1.0)
).toDF()
emails.show(truncate = false)
// 1. Tokenization
val tokenizer = new Tokenizer()
.setInputCol("text")
.setOutputCol("words")
// 2. TF-IDF
val hashingTF = new HashingTF()
.setInputCol("words")
.setOutputCol("rawFeatures")
.setNumFeatures(1000)
val idf = new IDF()
.setInputCol("rawFeatures")
.setOutputCol("features")
// 3. 파이프라인 구성
import org.apache.spark.ml.Pipeline
val lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.01)
val pipeline = new Pipeline()
.setStages(Array(tokenizer, hashingTF, idf, lr))
// 4. 모델 학습
val model = pipeline.fit(emails)
// 5. 예측
val testData = Seq(
Email("Special offer just for you!", 0.0),
Email("Can we reschedule the meeting?", 0.0)
).toDF()
val predictions = model.transform(testData)
predictions.select("text", "prediction").show(truncate = false)
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
// 데이터 분할
val Array(trainingData, testData) = emails.randomSplit(Array(0.8, 0.2))
// 모델 학습
val trainedModel = pipeline.fit(trainingData)
// 예측
val predictions = trainedModel.transform(testData)
// 평가
val evaluator = new BinaryClassificationEvaluator()
.setLabelCol("label")
.setRawPredictionCol("rawPrediction")
.setMetricName("areaUnderROC")
val auc = evaluator.evaluate(predictions)
println(s"AUC: $auc")
// 데이터 파티션 재조정
val repartitioned = accessLogs.repartition(10)
// 특정 컬럼 기준 파티셔닝
val partitioned = accessLogs.repartition($"status")
// 자주 사용하는 DataFrame 캐싱
val cachedLogs = accessLogs.cache()
// 메모리 캐시
cachedLogs.persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK)
// 캐시 해제
cachedLogs.unpersist()
import org.apache.spark.sql.functions.broadcast
// 작은 테이블을 브로드캐스트
val smallTable = spark.read.parquet("small_data.parquet")
val largeTable = spark.read.parquet("large_data.parquet")
val joined = largeTable.join(broadcast(smallTable), "key")
// Java Spark
Dataset<Row> df = spark.read().json("data.json");
df.filter(col("age").gt(25)).show();
// Scala Spark (더 간결)
val df = spark.read.json("data.json")
df.filter($"age" > 25).show()
// Java
dataset.map((MapFunction<String, Integer>) s -> s.length(), Encoders.INT());
// Scala
dataset.map(_.length)
다음 사용자 이벤트 로그를 분석하세요:
case class UserEvent(userId: Int, action: String, timestamp: Timestamp, pageId: String)
// 1. 사용자별 세션 수 계산 (30분 이상 비활동 시 새 세션)
// 2. 가장 인기있는 페이지 Top 10
// 3. 사용자별 평균 세션 길이
Collaborative Filtering으로 상품 추천 시스템 구현:
import org.apache.spark.ml.recommendation.ALS
case class Rating(userId: Int, itemId: Int, rating: Float)
// ALS 모델 학습 및 추천 생성
로그 데이터에서 이상 패턴 탐지:
// 1. 정상 트래픽 패턴 학습
// 2. 비정상 IP 또는 요청 패턴 식별
// 3. 알림 생성
이번 챕터에서 학습한 내용:
다음 챕터 예고: Chapter 16에서는 SBT, 테스팅, Java 상호운용 등 생태계와 도구를 학습합니다.