You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
big_data/L1 - Introduction to Apache.../L1_noninteractive_bike_anal...

132 lines
4.6 KiB
Python

#
# Лабораторная 1. Неинтерактивный анализ данных велопарковок SF Bay Area Bike Share в Apache Spark
#
# Для каждой стартовой станции найдем среднее время поездки и самую первую поездку.
#
# Команда для запуска
# spark-submit --master yarn --deploy-mode cluster L1_noninteractive_bike_analysis.py
#
# используйте --conf spark.yarn.submit.waitAppCompletion=true, чтобы ждать завершения выполнения спарк задачи
#
from pyspark import SparkContext, SparkConf
from typing import NamedTuple
from datetime import datetime
import numpy as np
conf = SparkConf().setAppName("Lab1_Script")
# conf.set("spark.yarn.submit.waitAppCompletion", "false")
sc = SparkContext(conf=conf)
tripData = sc.textFile("trips.csv")
# запомним заголовок, чтобы затем его исключить из данных
tripsHeader = tripData.first()
trips = tripData.filter(lambda row: row != tripsHeader).map(lambda row: row.split(",", -1))
stationData = sc.textFile("stations.csv")
stationsHeader = stationData.first()
stations = stationData.filter(lambda row: row != stationsHeader).map(lambda row: row.split(",", -1))
def initStation(stations):
class Station(NamedTuple):
station_id: int
name: str
lat: float
long: float
dockcount: int
landmark: str
installation: str
for station in stations:
yield Station(
station_id = int(station[0]),
name = station[1],
lat = float(station[2]),
long = float(station[3]),
dockcount = int(station[4]),
landmark = station[5],
installation = datetime.strptime(station[6], '%m/%d/%Y')
)
def initTrip(trips):
class Trip(NamedTuple):
trip_id: int
duration: int
start_date: datetime
start_station_name: str
start_station_id: int
end_date: datetime
end_station_name: str
end_station_id: int
bike_id: int
subscription_type: str
zip_code: str
for trip in trips:
yield Trip(
trip_id = int(trip[0]),
duration = int(trip[1]),
start_date = datetime.strptime(trip[2], '%m/%d/%Y %H:%M'),
start_station_name = trip[3],
start_station_id = int(trip[4]),
end_date = datetime.strptime(trip[5], '%m/%d/%Y %H:%M'),
end_station_name = trip[6],
end_station_id = trip[7],
bike_id = int(trip[8]),
subscription_type = trip[9],
zip_code = trip[10]
)
stationsInternal = stations.mapPartitions(initStation)
tripsInternal = trips.mapPartitions(initTrip)
tripsByStartStation = tripsInternal.keyBy(lambda trip: trip.start_station_name)
# среднее время поездок стартовой станции
def seqFunc(acc, duration):
duration_sum, count = acc
return (duration_sum + duration, count + 1)
def combFunc(acc1, acc2):
duration_sum1, count1 = acc1
duration_sum2, count2 = acc2
return (duration_sum1+duration_sum2, count1+count2)
def meanFunc(acc):
duration_sum, count = acc
return duration_sum/count
avgDurationByStartStation = tripsByStartStation\
.mapValues(lambda trip: trip.duration)\
.aggregateByKey(
zeroValue=(0,0),
seqFunc=seqFunc,
combFunc=combFunc)\
.mapValues(meanFunc)\
.sortBy(lambda x: x[1], ascending=False)
#
# первые поездки стартовых станций
#
firstStationsTrip = tripsByStartStation\
.reduceByKey(lambda tripA, tripB: tripA if tripA.start_date < tripB.start_date else tripB)
#
# старт выполнения задач и сохранение результата в HDFS
#
avgDurationByStartStation.saveAsTextFile("avg_duration_of_start_stations")
firstStationsTrip.saveAsTextFile("first_station_trips")
# результаты сохранятся в директории распределённой файловой системы в домашней папке пользователя /user/$USER/:
# - avg_duration_of_start_stations
# - first_station_trips
#
# получить результаты в виде одного файла можно командой `hadoop fs -getmerge`:
# hadoop fs -getmerge avg_duration_of_start_stations avg_duration_of_start_stations.txt
# hadoop fs -getmerge first_station_trips first_station_trips.txt
sc.stop()