|
|
|
|
#
|
|
|
|
|
# Лабораторная 1. Неинтерактивный анализ данных велопарковок SF Bay Area Bike Share в Apache Spark
|
|
|
|
|
#
|
|
|
|
|
# Для каждой стартовой станции найдем среднее время поездки и самую первую поездку.
|
|
|
|
|
#
|
|
|
|
|
# Команда для запуска
|
|
|
|
|
# spark-submit --master yarn --deploy-mode cluster L1_noninteractive_bike_analysis.py
|
|
|
|
|
#
|
|
|
|
|
# используйте --conf spark.yarn.submit.waitAppCompletion=true, чтобы ждать завершения выполнения спарк задачи
|
|
|
|
|
#
|
|
|
|
|
|
|
|
|
|
from pyspark import SparkContext, SparkConf
|
|
|
|
|
from typing import NamedTuple
|
|
|
|
|
from datetime import datetime
|
|
|
|
|
import numpy as np
|
|
|
|
|
|
|
|
|
|
conf = SparkConf().setAppName("Lab1_Script")
|
|
|
|
|
# conf.set("spark.yarn.submit.waitAppCompletion", "false")
|
|
|
|
|
|
|
|
|
|
sc = SparkContext(conf=conf)
|
|
|
|
|
|
|
|
|
|
tripData = sc.textFile("trips.csv")
|
|
|
|
|
# запомним заголовок, чтобы затем его исключить из данных
|
|
|
|
|
tripsHeader = tripData.first()
|
|
|
|
|
trips = tripData.filter(lambda row: row != tripsHeader).map(lambda row: row.split(",", -1))
|
|
|
|
|
|
|
|
|
|
stationData = sc.textFile("stations.csv")
|
|
|
|
|
stationsHeader = stationData.first()
|
|
|
|
|
stations = stationData.filter(lambda row: row != stationsHeader).map(lambda row: row.split(",", -1))
|
|
|
|
|
|
|
|
|
|
def initStation(stations):
|
|
|
|
|
class Station(NamedTuple):
|
|
|
|
|
station_id: int
|
|
|
|
|
name: str
|
|
|
|
|
lat: float
|
|
|
|
|
long: float
|
|
|
|
|
dockcount: int
|
|
|
|
|
landmark: str
|
|
|
|
|
installation: str
|
|
|
|
|
|
|
|
|
|
for station in stations:
|
|
|
|
|
yield Station(
|
|
|
|
|
station_id = int(station[0]),
|
|
|
|
|
name = station[1],
|
|
|
|
|
lat = float(station[2]),
|
|
|
|
|
long = float(station[3]),
|
|
|
|
|
dockcount = int(station[4]),
|
|
|
|
|
landmark = station[5],
|
|
|
|
|
installation = datetime.strptime(station[6], '%m/%d/%Y')
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def initTrip(trips):
|
|
|
|
|
class Trip(NamedTuple):
|
|
|
|
|
trip_id: int
|
|
|
|
|
duration: int
|
|
|
|
|
start_date: datetime
|
|
|
|
|
start_station_name: str
|
|
|
|
|
start_station_id: int
|
|
|
|
|
end_date: datetime
|
|
|
|
|
end_station_name: str
|
|
|
|
|
end_station_id: int
|
|
|
|
|
bike_id: int
|
|
|
|
|
subscription_type: str
|
|
|
|
|
zip_code: str
|
|
|
|
|
|
|
|
|
|
for trip in trips:
|
|
|
|
|
yield Trip(
|
|
|
|
|
trip_id = int(trip[0]),
|
|
|
|
|
duration = int(trip[1]),
|
|
|
|
|
start_date = datetime.strptime(trip[2], '%m/%d/%Y %H:%M'),
|
|
|
|
|
start_station_name = trip[3],
|
|
|
|
|
start_station_id = int(trip[4]),
|
|
|
|
|
end_date = datetime.strptime(trip[5], '%m/%d/%Y %H:%M'),
|
|
|
|
|
end_station_name = trip[6],
|
|
|
|
|
end_station_id = trip[7],
|
|
|
|
|
bike_id = int(trip[8]),
|
|
|
|
|
subscription_type = trip[9],
|
|
|
|
|
zip_code = trip[10]
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
stationsInternal = stations.mapPartitions(initStation)
|
|
|
|
|
tripsInternal = trips.mapPartitions(initTrip)
|
|
|
|
|
|
|
|
|
|
tripsByStartStation = tripsInternal.keyBy(lambda trip: trip.start_station_name)
|
|
|
|
|
|
|
|
|
|
# среднее время поездок стартовой станции
|
|
|
|
|
|
|
|
|
|
def seqFunc(acc, duration):
|
|
|
|
|
duration_sum, count = acc
|
|
|
|
|
return (duration_sum + duration, count + 1)
|
|
|
|
|
|
|
|
|
|
def combFunc(acc1, acc2):
|
|
|
|
|
duration_sum1, count1 = acc1
|
|
|
|
|
duration_sum2, count2 = acc2
|
|
|
|
|
return (duration_sum1+duration_sum2, count1+count2)
|
|
|
|
|
|
|
|
|
|
def meanFunc(acc):
|
|
|
|
|
duration_sum, count = acc
|
|
|
|
|
return duration_sum/count
|
|
|
|
|
|
|
|
|
|
avgDurationByStartStation = tripsByStartStation\
|
|
|
|
|
.mapValues(lambda trip: trip.duration)\
|
|
|
|
|
.aggregateByKey(
|
|
|
|
|
zeroValue=(0,0),
|
|
|
|
|
seqFunc=seqFunc,
|
|
|
|
|
combFunc=combFunc)\
|
|
|
|
|
.mapValues(meanFunc)\
|
|
|
|
|
.sortBy(lambda x: x[1], ascending=False)
|
|
|
|
|
|
|
|
|
|
#
|
|
|
|
|
# первые поездки стартовых станций
|
|
|
|
|
#
|
|
|
|
|
|
|
|
|
|
firstStationsTrip = tripsByStartStation\
|
|
|
|
|
.reduceByKey(lambda tripA, tripB: tripA if tripA.start_date < tripB.start_date else tripB)
|
|
|
|
|
|
|
|
|
|
#
|
|
|
|
|
# старт выполнения задач и сохранение результата в HDFS
|
|
|
|
|
#
|
|
|
|
|
|
|
|
|
|
avgDurationByStartStation.saveAsTextFile("avg_duration_of_start_stations")
|
|
|
|
|
firstStationsTrip.saveAsTextFile("first_station_trips")
|
|
|
|
|
|
|
|
|
|
# результаты сохранятся в директории распределённой файловой системы в домашней папке пользователя /user/$USER/:
|
|
|
|
|
# - avg_duration_of_start_stations
|
|
|
|
|
# - first_station_trips
|
|
|
|
|
#
|
|
|
|
|
# получить результаты в виде одного файла можно командой `hadoop fs -getmerge`:
|
|
|
|
|
# hadoop fs -getmerge avg_duration_of_start_stations avg_duration_of_start_stations.txt
|
|
|
|
|
# hadoop fs -getmerge first_station_trips first_station_trips.txt
|
|
|
|
|
|
|
|
|
|
sc.stop()
|