You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
big_data/L1 - Introduction to Apache.../L1_interactive_bike_analysi...

37 KiB

Лабораторная 1. Интерактивный анализ данных велопарковок SF Bay Area Bike Share в Apache Spark

Описание данных

https://www.kaggle.com/benhamner/sf-bay-area-bike-share

stations.csv схема:

id: station ID number
name: name of station
lat: latitude
long: longitude
dock_count: number of total docks at station
city: city (San Francisco, Redwood City, Palo Alto, Mountain View, San Jose)
installation_date: original date that station was installed. If station was moved, it is noted below.

trips.csv схема:

id: numeric ID of bike trip
duration: time of trip in seconds
start_date: start date of trip with date and time, in PST
start_station_name: station name of start station
start_station_id: numeric reference for start station
end_date: end date of trip with date and time, in PST
end_station_name: station name for end station
end_station_id: numeric reference for end station
bike_id: ID of bike used
subscription_type: Subscriber = annual or 30-day member; Customer = 24-hour or 3-day member
zip_code: Home zip code of subscriber (customers can choose to manually enter zip at kiosk however data is unreliable)
In [124]:
from pyspark import SparkContext, SparkConf 
In [125]:
conf = SparkConf().setAppName("L1_interactive_bike_analysis").setMaster('yarn')
In [126]:
sc = SparkContext(conf=conf)
In [10]:
tripData = sc.textFile("trips.csv")
# запомним заголовок, чтобы затем его исключить из данных
tripsHeader = tripData.first()
trips = tripData.filter(lambda row: row != tripsHeader).map(lambda row: row.split(",", -1))

stationData = sc.textFile("stations.csv")
stationsHeader = stationData.first()
stations = stationData.filter(lambda row: row != stationsHeader).map(lambda row: row.split(",", -1))
In [19]:
list(enumerate(tripsHeader.split(",")))
Out[19]:
[(0, 'id'),
 (1, 'duration'),
 (2, 'start_date'),
 (3, 'start_station_name'),
 (4, 'start_station_id'),
 (5, 'end_date'),
 (6, 'end_station_name'),
 (7, 'end_station_id'),
 (8, 'bike_id'),
 (9, 'subscription_type'),
 (10, 'zip_code')]
In [20]:
list(enumerate(stationsHeader.split(",")))
Out[20]:
[(0, 'id'),
 (1, 'name'),
 (2, 'lat'),
 (3, 'long'),
 (4, 'dock_count'),
 (5, 'city'),
 (6, 'installation_date')]
In [13]:
trips.take(2)
Out[13]:
[['4576',
  '63',
  '8/29/2013 14:13',
  'South Van Ness at Market',
  '66',
  '8/29/2013 14:14',
  'South Van Ness at Market',
  '66',
  '520',
  'Subscriber',
  '94127'],
 ['4607',
  '70',
  '8/29/2013 14:42',
  'San Jose City Hall',
  '10',
  '8/29/2013 14:43',
  'San Jose City Hall',
  '10',
  '661',
  'Subscriber',
  '95138']]
In [14]:
stations.take(2)
Out[14]:
[['2',
  'San Jose Diridon Caltrain Station',
  '37.329732',
  '-121.90178200000001',
  '27',
  'San Jose',
  '8/6/2013'],
 ['3',
  'San Jose Civic Center',
  '37.330698',
  '-121.888979',
  '15',
  'San Jose',
  '8/5/2013']]

Объявите stationsIndexed так, чтобы результатом был список пар ключ-значение с целочисленным ключом из первой колонки. Таким образом вы создаёте индекс на основе первой колонки - номера велостоянки

In [15]:
stationsIndexed = stations.keyBy(lambda station: station[0])
In [17]:
stationsIndexed.take(2)
Out[17]:
[('2',
  ['2',
   'San Jose Diridon Caltrain Station',
   '37.329732',
   '-121.90178200000001',
   '27',
   'San Jose',
   '8/6/2013']),
 ('3',
  ['3',
   'San Jose Civic Center',
   '37.330698',
   '-121.888979',
   '15',
   'San Jose',
   '8/5/2013'])]

Аналогичное действие проделайте для индексирования коллекции trips по колонкам start_station_id и end_station_id и сохраните результат в переменные, например tripsByStartTerminals и tripsByEndTerminals.

In [21]:
tripsByStartTerminals = 
tripsByEndTerminals = 

Выполните операцию объединения коллекций по ключу с помощью функции join. Объедините stationsIndexed и tripsByStartTerminals, stationsIndexed и tripsByEndTerminals.

In [22]:
startTrips = stationsIndexed.join(tripsByStartTerminals)
endTrips = stationsIndexed.join(tripsByEndTerminals)

Объявление последовательности трансформаций приводит к созданию ацикличного ориентированного графа. Вывести полученный граф можно для любого RDD.

In [29]:
print(startTrips.toDebugString().decode("utf-8"))
(4) PythonRDD[33] at RDD at PythonRDD.scala:53 []
 |  MapPartitionsRDD[24] at mapPartitions at PythonRDD.scala:145 []
 |  ShuffledRDD[23] at partitionBy at NativeMethodAccessorImpl.java:0 []
 +-(4) PairwiseRDD[22] at join at <ipython-input-22-d9b5f12a0e8e>:1 []
    |  PythonRDD[21] at join at <ipython-input-22-d9b5f12a0e8e>:1 []
    |  UnionRDD[20] at union at NativeMethodAccessorImpl.java:0 []
    |  PythonRDD[18] at RDD at PythonRDD.scala:53 []
    |  stations.csv MapPartitionsRDD[13] at textFile at NativeMethodAccessorImpl.java:0 []
    |  stations.csv HadoopRDD[12] at textFile at NativeMethodAccessorImpl.java:0 []
    |  PythonRDD[19] at RDD at PythonRDD.scala:53 []
    |  trips.csv MapPartitionsRDD[10] at textFile at NativeMethodAccessorImpl.java:0 []
    |  trips.csv HadoopRDD[9] at textFile at NativeMethodAccessorImpl.java:0 []
In [30]:
print(endTrips.toDebugString().decode("utf-8"))
(4) PythonRDD[32] at RDD at PythonRDD.scala:53 []
 |  MapPartitionsRDD[31] at mapPartitions at PythonRDD.scala:145 []
 |  ShuffledRDD[30] at partitionBy at NativeMethodAccessorImpl.java:0 []
 +-(4) PairwiseRDD[29] at join at <ipython-input-22-d9b5f12a0e8e>:2 []
    |  PythonRDD[28] at join at <ipython-input-22-d9b5f12a0e8e>:2 []
    |  UnionRDD[27] at union at NativeMethodAccessorImpl.java:0 []
    |  PythonRDD[25] at RDD at PythonRDD.scala:53 []
    |  stations.csv MapPartitionsRDD[13] at textFile at NativeMethodAccessorImpl.java:0 []
    |  stations.csv HadoopRDD[12] at textFile at NativeMethodAccessorImpl.java:0 []
    |  PythonRDD[26] at RDD at PythonRDD.scala:53 []
    |  trips.csv MapPartitionsRDD[10] at textFile at NativeMethodAccessorImpl.java:0 []
    |  trips.csv HadoopRDD[9] at textFile at NativeMethodAccessorImpl.java:0 []

Выполните объявленные графы трансформаций вызовом команды count.

In [31]:
startTrips.count()
Out[31]:
669959
In [32]:
endTrips.count()
Out[32]:
669959

Если вы знаете распределение ключей заранее, вы можете выбрать оптимальный способ хеширования ключей по разделам Partition. Например, если один ключ встречается на порядки чаще, чем другие ключи, то использование HashPartitioner будет не лучшим выбором, так как данные связанные с этим ключом будут собираться в одном разделе. Это приведёт к неравномерной нагрузке на вычислительные ресурсы.

Выбрать определённую реализацию класса распределения по разделам можно с помощью функции RDD partitionBy. Например, для RDD stationsIndexed выбирается portable_hash(idx) с количеством разделов равным количеству разделов trips RDD.

In [55]:
from pyspark.rdd import portable_hash

stationsIndexed.partitionBy(numPartitions=trips.getNumPartitions(), partitionFunc=lambda x: portable_hash(x[0]))
Out[55]:
MapPartitionsRDD[39] at mapPartitions at PythonRDD.scala:145

Узнать какой класс назначен для текущего RDD можно обращением к полю partitioner.

In [58]:
stationsIndexed.partitioner

Создание модели данных

Для более эффективной обработки и получения дополнительных возможностей мы можем объявить классы сущностей предметной области и преобразовать исходные строковые данные в объявленное представление.

В Scala часто для объявления структур данных используется конструкция case class. Особенностью такого объявления класса являются: автоматическое создание методов доступа get для аргументов конструктора, автоматическое определение методов hashcode и equals, возможность case классов быть разобранными по шаблону (pattern matching).

In [62]:
from typing import NamedTuple
from datetime import datetime
In [63]:
def initStation(stations):
    class Station(NamedTuple):
        station_id: int
        name: str
        lat: float
        long: float
        dockcount: int
        landmark: str
        installation: str
    
    for station in stations:
        yield Station(
            station_id = int(station[0]),
            name = station[1],
            lat = float(station[2]),
            long = float(station[3]),
            dockcount = int(station[4]),
            landmark = station[5],
            installation = datetime.strptime(station[6], '%m/%d/%Y')
        )
In [71]:
stationsInternal = stations.mapPartitions(initStation)
stationsInternal.first()
Out[71]:
Station(station_id=2, name='San Jose Diridon Caltrain Station', lat=37.329732, long=-121.90178200000001, dockcount=27, landmark='San Jose', installation=datetime.datetime(2013, 8, 6, 0, 0))
In [80]:
def initTrip(trips):
    class Trip(NamedTuple):
        trip_id: int
        duration: int
        start_date: datetime
        start_station_name: str
        start_station_id: int
        end_date: datetime
        end_station_name: str
        end_station_id: int
        bike_id: int
        subscription_type: str
        zip_code: str
        
    for trip in trips:
        yield Trip(                             
             trip_id = int(trip[0]),
             duration = int(trip[1]),
             start_date = datetime.strptime(trip[2], '%m/%d/%Y %H:%M'),
             start_station_name = trip[3],
             start_station_id = int(trip[4]),
             end_date = datetime.strptime(trip[5], '%m/%d/%Y %H:%M'),
             end_station_name = trip[6],
             end_station_id = trip[7],
             bike_id = int(trip[8]),
             subscription_type = trip[9],
             zip_code = trip[10]
        )
In [81]:
tripsInternal = trips.mapPartitions(initTrip)
tripsInternal.first()
Out[81]:
Trip(trip_id=4576, duration=63, start_date=datetime.datetime(2013, 8, 29, 14, 13), start_station_name='South Van Ness at Market', start_station_id=66, end_date=datetime.datetime(2013, 8, 29, 14, 14), end_station_name='South Van Ness at Market', end_station_id='66', bike_id=520, subscription_type='Subscriber', zip_code='94127')

Для каждой стартовой станции найдем среднее время поездки. Будем использовать метод groupByKey.

Для этого потребуется преобразовать trips RDD в RDD коллекцию пар ключ-значение аналогично тому, как мы совершали это ранее методом keyBy.

In [82]:
tripsByStartStation = tripsInternal.keyBy(lambda trip: trip.start_station_name)

Рассчитаем среднее время поездки для каждого стартового парковочного места

In [85]:
import numpy as np

avgDurationByStartStation = tripsByStartStation\
 .mapValues(lambda trip: trip.duration)\
 .groupByKey()\
 .mapValues(lambda trip_durations: np.mean(list(trip_durations)))

Выведем первые 10 результатов

In [94]:
%%time

avgDurationByStartStation.top(10, key=lambda x: x[1])
CPU times: user 6.79 ms, sys: 5.3 ms, total: 12.1 ms
Wall time: 127 ms
Out[94]:
[('University and Emerson', 7090.239417989418),
 ('California Ave Caltrain Station', 4628.005847953216),
 ('Redwood City Public Library', 4579.234741784037),
 ('Park at Olive', 4438.1613333333335),
 ('San Jose Civic Center', 4208.016938519448),
 ('Rengstorff Avenue / California Street', 4174.082373782108),
 ('Redwood City Medical Center', 3959.491961414791),
 ('Palo Alto Caltrain Station', 3210.6489815253435),
 ('San Mateo County Center', 2716.7700348432054),
 ('Broadway at Main', 2481.2537313432836)]

Выполнение операции groupByKey приводит к интенсивным передачам данных. Если группировка делается для последующей редукции элементов лучше использовать трансформацию reduceByKey или aggregateByKey. Их выполнение приведёт сначала к локальной редукции над разделом Partition, а затем будет произведено окончательное суммирование над полученными частичными суммами.

Примечание. Выполнение reduceByKey логически сходно с выполнением Combine и Reduce фазы MapReduce работы.

Функция aggregateByKey является аналогом reduceByKey с возможностью указывать начальный элемент.

Рассчитаем среднее значение с помощью aggregateByKey. Одновременно будут вычисляться два значения для каждого стартового терминала: сумма времён и количество поездок.

In [88]:
? tripsByStartStation.aggregateByKey
Signature:
 tripsByStartStation.aggregateByKey(
    zeroValue,
    seqFunc,
    combFunc,
    numPartitions=None,
    partitionFunc=<function portable_hash at 0x7fb9a437c310>,
)
Docstring:
Aggregate the values of each key, using given combine functions and a neutral
"zero value". This function can return a different result type, U, than the type
of the values in this RDD, V. Thus, we need one operation for merging a V into
a U and one operation for merging two U's, The former operation is used for merging
values within a partition, and the latter is used for merging values between
partitions. To avoid memory allocation, both of these functions are
allowed to modify and return their first argument instead of creating a new U.
File:      ~/.local/lib/python3.9/site-packages/pyspark/rdd.py
Type:      method
In [90]:
def seqFunc(acc, duration):
    duration_sum, count = acc
    return (duration_sum + duration, count + 1)

def combFunc(acc1, acc2):
    duration_sum1, count1 = acc1
    duration_sum2, count2 = acc2
    return (duration_sum1+duration_sum2, count1+count2)

def meanFunc(acc):
    duration_sum, count = acc
    return duration_sum/count

avgDurationByStartStation2 = tripsByStartStation\
  .mapValues(lambda trip: trip.duration)\
  .aggregateByKey(
    zeroValue=(0,0),
    seqFunc=seqFunc,
    combFunc=combFunc)\
  .mapValues(meanFunc)

В zeroValue передаётся начальное значение. В нашем случае это пара нулей. Первая функция seqFunc предназначена для прохода по коллекции партиции. На этом проходе значение элементов помещаются средой в переменную duration, а переменная «аккумулятора» acc накапливает значения. Вторая функция combFunc предназначена для этапа редукции частично посчитанных локальных результатов.

Сравните результаты avgDurationByStartStation и avgDurationByStartStation2 и их время выполнения.

In [95]:
%%time

avgDurationByStartStation2.top(10, key=lambda x: x[1])
CPU times: user 8.47 ms, sys: 3.23 ms, total: 11.7 ms
Wall time: 65.6 ms
Out[95]:
[('University and Emerson', 7090.239417989418),
 ('California Ave Caltrain Station', 4628.005847953216),
 ('Redwood City Public Library', 4579.234741784037),
 ('Park at Olive', 4438.1613333333335),
 ('San Jose Civic Center', 4208.016938519448),
 ('Rengstorff Avenue / California Street', 4174.082373782108),
 ('Redwood City Medical Center', 3959.491961414791),
 ('Palo Alto Caltrain Station', 3210.6489815253435),
 ('San Mateo County Center', 2716.7700348432054),
 ('Broadway at Main', 2481.2537313432836)]

Теперь найдём первую поездку для каждой велостоянки. Для решения опять потребуется группировка. Ещё одним недостатком groupByKey данных является то, что для группировки данные должны поместиться в оперативной памяти. Это может привести к ошибке OutOfMemoryException для больших объёмов данных.

Найдем самую раннюю поездку для каждой станции. Сгруппируем поездки по станциям, возьмём первую поездку из отсортированного списка:

In [115]:
def earliestTrip(trips):
    if trips is None:
        return None
    if len(trips)==0:
        return trips
    trips = list(trips)
    min_date = trips[0].start_date
    min_trip = trips[0]
    for trip in trips[1:]:
        if min_date > trip.start_date:
            min_date = trip.start_date
            min_trip = trip
    return min_trip

firstGrouped = tripsByStartStation\
  .groupByKey()\
  .mapValues(lambda trips: earliestTrip(trips))
In [116]:
%%time

firstGrouped.take(5)
CPU times: user 3.15 ms, sys: 15.3 ms, total: 18.4 ms
Wall time: 25.3 s
Out[116]:
[('Market at 4th',
  Trip(trip_id=4134, duration=574, start_date=datetime.datetime(2013, 8, 29, 10, 19), start_station_name='Market at 4th', start_station_id=76, end_date=datetime.datetime(2013, 8, 29, 10, 29), end_station_name='2nd at South Park', end_station_id='64', bike_id=426, subscription_type='Customer', zip_code='94117')),
 ('Market at Sansome',
  Trip(trip_id=4321, duration=505, start_date=datetime.datetime(2013, 8, 29, 12, 10), start_station_name='Market at Sansome', start_station_id=77, end_date=datetime.datetime(2013, 8, 29, 12, 19), end_station_name='Harry Bridges Plaza (Ferry Building)', end_station_id='50', bike_id=625, subscription_type='Subscriber', zip_code='94110')),
 ('San Jose Diridon Caltrain Station',
  Trip(trip_id=4547, duration=1580, start_date=datetime.datetime(2013, 8, 29, 13, 52), start_station_name='San Jose Diridon Caltrain Station', start_station_id=2, end_date=datetime.datetime(2013, 8, 29, 14, 18), end_station_name='San Jose City Hall', end_station_id='10', bike_id=107, subscription_type='Customer', zip_code='94306')),
 ('Howard at 2nd',
  Trip(trip_id=4524, duration=579, start_date=datetime.datetime(2013, 8, 29, 13, 39), start_station_name='Howard at 2nd', start_station_id=63, end_date=datetime.datetime(2013, 8, 29, 13, 48), end_station_name='San Francisco Caltrain (Townsend at 4th)', end_station_id='70', bike_id=358, subscription_type='Subscriber', zip_code='94117')),
 ('San Francisco City Hall',
  Trip(trip_id=4265, duration=151, start_date=datetime.datetime(2013, 8, 29, 11, 40), start_station_name='San Francisco City Hall', start_station_id=58, end_date=datetime.datetime(2013, 8, 29, 11, 42), end_station_name='San Francisco City Hall', end_station_id='58', bike_id=520, subscription_type='Subscriber', zip_code='94110'))]

Лучшим вариантом с точки зрения эффективности будет использование трансформации reduceByKey

In [117]:
firstGrouped = tripsByStartStation\
  .reduceByKey(lambda tripA, tripB: tripA if tripA.start_date < tripB.start_date else tripB)
In [118]:
%%time

firstGrouped.take(5)
CPU times: user 15.5 ms, sys: 2.89 ms, total: 18.4 ms
Wall time: 16 s
Out[118]:
[('Market at 4th',
  Trip(trip_id=4134, duration=574, start_date=datetime.datetime(2013, 8, 29, 10, 19), start_station_name='Market at 4th', start_station_id=76, end_date=datetime.datetime(2013, 8, 29, 10, 29), end_station_name='2nd at South Park', end_station_id='64', bike_id=426, subscription_type='Customer', zip_code='94117')),
 ('Market at Sansome',
  Trip(trip_id=4320, duration=520, start_date=datetime.datetime(2013, 8, 29, 12, 10), start_station_name='Market at Sansome', start_station_id=77, end_date=datetime.datetime(2013, 8, 29, 12, 19), end_station_name='Harry Bridges Plaza (Ferry Building)', end_station_id='50', bike_id=616, subscription_type='Subscriber', zip_code='94109')),
 ('San Jose Diridon Caltrain Station',
  Trip(trip_id=4547, duration=1580, start_date=datetime.datetime(2013, 8, 29, 13, 52), start_station_name='San Jose Diridon Caltrain Station', start_station_id=2, end_date=datetime.datetime(2013, 8, 29, 14, 18), end_station_name='San Jose City Hall', end_station_id='10', bike_id=107, subscription_type='Customer', zip_code='94306')),
 ('Howard at 2nd',
  Trip(trip_id=4525, duration=650, start_date=datetime.datetime(2013, 8, 29, 13, 39), start_station_name='Howard at 2nd', start_station_id=63, end_date=datetime.datetime(2013, 8, 29, 13, 50), end_station_name='San Francisco Caltrain (Townsend at 4th)', end_station_id='70', bike_id=359, subscription_type='Subscriber', zip_code='94401')),
 ('San Francisco City Hall',
  Trip(trip_id=4265, duration=151, start_date=datetime.datetime(2013, 8, 29, 11, 40), start_station_name='San Francisco City Hall', start_station_id=58, end_date=datetime.datetime(2013, 8, 29, 11, 42), end_station_name='San Francisco City Hall', end_station_id='58', bike_id=520, subscription_type='Subscriber', zip_code='94110'))]
In [123]:
sc.stop()
In [ ]: