You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
big_data/L1 - Introduction to Apache.../L1_interactive_bike_analysi...

1035 lines
40 KiB
Plaintext

{
"cells": [
{
"cell_type": "markdown",
"id": "848f4d20-86d3-44e8-a864-aa09ca222e59",
"metadata": {},
"source": [
"# Лабораторная 1. Интерактивный анализ данных велопарковок SF Bay Area Bike Share в Apache Spark\n",
"\n",
"## Описание данных\n",
"\n",
"https://www.kaggle.com/benhamner/sf-bay-area-bike-share\n",
"\n",
"stations.csv схема:\n",
"\n",
"```\n",
"id: station ID number\n",
"name: name of station\n",
"lat: latitude\n",
"long: longitude\n",
"dock_count: number of total docks at station\n",
"city: city (San Francisco, Redwood City, Palo Alto, Mountain View, San Jose)\n",
"installation_date: original date that station was installed. If station was moved, it is noted below.\n",
"```\n",
"\n",
"trips.csv схема:\n",
"\n",
"```\n",
"id: numeric ID of bike trip\n",
"duration: time of trip in seconds\n",
"start_date: start date of trip with date and time, in PST\n",
"start_station_name: station name of start station\n",
"start_station_id: numeric reference for start station\n",
"end_date: end date of trip with date and time, in PST\n",
"end_station_name: station name for end station\n",
"end_station_id: numeric reference for end station\n",
"bike_id: ID of bike used\n",
"subscription_type: Subscriber = annual or 30-day member; Customer = 24-hour or 3-day member\n",
"zip_code: Home zip code of subscriber (customers can choose to manually enter zip at kiosk however data is unreliable)\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": 128,
"id": "30002669-3799-4a39-831e-d276a4708f9a",
"metadata": {},
"outputs": [],
"source": [
"from pyspark import SparkContext, SparkConf "
]
},
{
"cell_type": "code",
"execution_count": 130,
"id": "3ed7b961-7879-4937-ac24-11d615e091b8",
"metadata": {},
"outputs": [],
"source": [
"conf = SparkConf().setAppName(\"L1_interactive_bike_analysis\").setMaster('yarn')"
]
},
{
"cell_type": "code",
"execution_count": 131,
"id": "0da718e8-7ad8-42f1-872f-c8805fd3c41c",
"metadata": {},
"outputs": [],
"source": [
"sc = SparkContext(conf=conf)"
]
},
{
"cell_type": "code",
"execution_count": 133,
"id": "47896fd0-906e-413d-8980-3948d73a3067",
"metadata": {},
"outputs": [],
"source": [
"tripData = sc.textFile(\"tripd_with_error.csv\")\n",
"tripsHeader = tripData.first()\n",
"trips = tripData.filter(lambda row: row != tripsHeader).map(lambda row: row.split(\",\", -1))"
]
},
{
"cell_type": "code",
"execution_count": 10,
"id": "6f486cc0-d591-43b8-8c92-a50415f3a141",
"metadata": {},
"outputs": [],
"source": [
"tripData = sc.textFile(\"trips.csv\")\n",
"# запомним заголовок, чтобы затем его исключить из данных\n",
"tripsHeader = tripData.first()\n",
"trips = tripData.filter(lambda row: row != tripsHeader).map(lambda row: row.split(\",\", -1))\n",
"\n",
"stationData = sc.textFile(\"stations.csv\")\n",
"stationsHeader = stationData.first()\n",
"stations = stationData.filter(lambda row: row != stationsHeader).map(lambda row: row.split(\",\", -1))"
]
},
{
"cell_type": "code",
"execution_count": 19,
"id": "bb7a57bc-bbef-452f-97b8-95a5412c6aef",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[(0, 'id'),\n",
" (1, 'duration'),\n",
" (2, 'start_date'),\n",
" (3, 'start_station_name'),\n",
" (4, 'start_station_id'),\n",
" (5, 'end_date'),\n",
" (6, 'end_station_name'),\n",
" (7, 'end_station_id'),\n",
" (8, 'bike_id'),\n",
" (9, 'subscription_type'),\n",
" (10, 'zip_code')]"
]
},
"execution_count": 19,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"list(enumerate(tripsHeader.split(\",\")))"
]
},
{
"cell_type": "code",
"execution_count": 20,
"id": "537d7ba4-0a47-4041-93f5-71148fa39821",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[(0, 'id'),\n",
" (1, 'name'),\n",
" (2, 'lat'),\n",
" (3, 'long'),\n",
" (4, 'dock_count'),\n",
" (5, 'city'),\n",
" (6, 'installation_date')]"
]
},
"execution_count": 20,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"list(enumerate(stationsHeader.split(\",\")))"
]
},
{
"cell_type": "code",
"execution_count": 13,
"id": "9c745434-7d5f-4bc2-8681-4deaf7d764f0",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[['4576',\n",
" '63',\n",
" '8/29/2013 14:13',\n",
" 'South Van Ness at Market',\n",
" '66',\n",
" '8/29/2013 14:14',\n",
" 'South Van Ness at Market',\n",
" '66',\n",
" '520',\n",
" 'Subscriber',\n",
" '94127'],\n",
" ['4607',\n",
" '70',\n",
" '8/29/2013 14:42',\n",
" 'San Jose City Hall',\n",
" '10',\n",
" '8/29/2013 14:43',\n",
" 'San Jose City Hall',\n",
" '10',\n",
" '661',\n",
" 'Subscriber',\n",
" '95138']]"
]
},
"execution_count": 13,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"trips.take(2)"
]
},
{
"cell_type": "code",
"execution_count": 14,
"id": "ab452ac6-83e1-4116-91eb-f7eed5cdd60e",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[['2',\n",
" 'San Jose Diridon Caltrain Station',\n",
" '37.329732',\n",
" '-121.90178200000001',\n",
" '27',\n",
" 'San Jose',\n",
" '8/6/2013'],\n",
" ['3',\n",
" 'San Jose Civic Center',\n",
" '37.330698',\n",
" '-121.888979',\n",
" '15',\n",
" 'San Jose',\n",
" '8/5/2013']]"
]
},
"execution_count": 14,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"stations.take(2)"
]
},
{
"cell_type": "markdown",
"id": "64f9d459-525c-432b-8715-529a73def432",
"metadata": {},
"source": [
"Объявите `stationsIndexed` так, чтобы результатом был список пар ключ-значение с целочисленным ключом из первой колонки. Таким образом вы создаёте индекс на основе первой колонки - номера велостоянки"
]
},
{
"cell_type": "code",
"execution_count": 15,
"id": "632d36ea-3112-4466-9b48-618d762e390f",
"metadata": {},
"outputs": [],
"source": [
"stationsIndexed = stations.keyBy(lambda station: station[0])"
]
},
{
"cell_type": "code",
"execution_count": 17,
"id": "64515328-9ea3-4997-8da4-e0f6102b280f",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[('2',\n",
" ['2',\n",
" 'San Jose Diridon Caltrain Station',\n",
" '37.329732',\n",
" '-121.90178200000001',\n",
" '27',\n",
" 'San Jose',\n",
" '8/6/2013']),\n",
" ('3',\n",
" ['3',\n",
" 'San Jose Civic Center',\n",
" '37.330698',\n",
" '-121.888979',\n",
" '15',\n",
" 'San Jose',\n",
" '8/5/2013'])]"
]
},
"execution_count": 17,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"stationsIndexed.take(2)"
]
},
{
"cell_type": "markdown",
"id": "79de40a1-ebf9-408a-826f-ebf95d6490e9",
"metadata": {},
"source": [
"Аналогичное действие проделайте для индексирования коллекции trips по колонкам start_station_id и end_station_id и сохраните результат в переменные, например tripsByStartTerminals и tripsByEndTerminals."
]
},
{
"cell_type": "code",
"execution_count": 21,
"id": "1ea1aa3e-d972-4a64-b2ef-3e60e21093a4",
"metadata": {},
"outputs": [],
"source": [
"tripsByStartTerminals = \n",
"tripsByEndTerminals = "
]
},
{
"cell_type": "markdown",
"id": "3bcffb6f-68b0-45bf-9359-f97e4ec281b1",
"metadata": {},
"source": [
"Выполните операцию объединения коллекций по ключу с помощью функции join. Объедините stationsIndexed и tripsByStartTerminals, stationsIndexed и tripsByEndTerminals."
]
},
{
"cell_type": "code",
"execution_count": 22,
"id": "33248614-5149-4c63-b222-060b5fec4b9d",
"metadata": {},
"outputs": [],
"source": [
"startTrips = stationsIndexed.join(tripsByStartTerminals)\n",
"endTrips = stationsIndexed.join(tripsByEndTerminals)"
]
},
{
"cell_type": "markdown",
"id": "d8db4683-3f4b-46f3-8705-bcbad0f17e0b",
"metadata": {},
"source": [
"Объявление последовательности трансформаций приводит к созданию ацикличного ориентированного графа. Вывести полученный граф можно для любого RDD."
]
},
{
"cell_type": "code",
"execution_count": 29,
"id": "d61bec0b-4839-48b6-a8c5-7fbd24b30bf3",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"(4) PythonRDD[33] at RDD at PythonRDD.scala:53 []\n",
" | MapPartitionsRDD[24] at mapPartitions at PythonRDD.scala:145 []\n",
" | ShuffledRDD[23] at partitionBy at NativeMethodAccessorImpl.java:0 []\n",
" +-(4) PairwiseRDD[22] at join at <ipython-input-22-d9b5f12a0e8e>:1 []\n",
" | PythonRDD[21] at join at <ipython-input-22-d9b5f12a0e8e>:1 []\n",
" | UnionRDD[20] at union at NativeMethodAccessorImpl.java:0 []\n",
" | PythonRDD[18] at RDD at PythonRDD.scala:53 []\n",
" | stations.csv MapPartitionsRDD[13] at textFile at NativeMethodAccessorImpl.java:0 []\n",
" | stations.csv HadoopRDD[12] at textFile at NativeMethodAccessorImpl.java:0 []\n",
" | PythonRDD[19] at RDD at PythonRDD.scala:53 []\n",
" | trips.csv MapPartitionsRDD[10] at textFile at NativeMethodAccessorImpl.java:0 []\n",
" | trips.csv HadoopRDD[9] at textFile at NativeMethodAccessorImpl.java:0 []\n"
]
}
],
"source": [
"print(startTrips.toDebugString().decode(\"utf-8\"))"
]
},
{
"cell_type": "code",
"execution_count": 30,
"id": "6bd0c246-509d-46e1-aad3-2bbe4679d4df",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"(4) PythonRDD[32] at RDD at PythonRDD.scala:53 []\n",
" | MapPartitionsRDD[31] at mapPartitions at PythonRDD.scala:145 []\n",
" | ShuffledRDD[30] at partitionBy at NativeMethodAccessorImpl.java:0 []\n",
" +-(4) PairwiseRDD[29] at join at <ipython-input-22-d9b5f12a0e8e>:2 []\n",
" | PythonRDD[28] at join at <ipython-input-22-d9b5f12a0e8e>:2 []\n",
" | UnionRDD[27] at union at NativeMethodAccessorImpl.java:0 []\n",
" | PythonRDD[25] at RDD at PythonRDD.scala:53 []\n",
" | stations.csv MapPartitionsRDD[13] at textFile at NativeMethodAccessorImpl.java:0 []\n",
" | stations.csv HadoopRDD[12] at textFile at NativeMethodAccessorImpl.java:0 []\n",
" | PythonRDD[26] at RDD at PythonRDD.scala:53 []\n",
" | trips.csv MapPartitionsRDD[10] at textFile at NativeMethodAccessorImpl.java:0 []\n",
" | trips.csv HadoopRDD[9] at textFile at NativeMethodAccessorImpl.java:0 []\n"
]
}
],
"source": [
"print(endTrips.toDebugString().decode(\"utf-8\"))"
]
},
{
"cell_type": "markdown",
"id": "35e6fdec-4552-417a-9a79-5ac3c1604ac6",
"metadata": {},
"source": [
"Выполните объявленные графы трансформаций вызовом команды count."
]
},
{
"cell_type": "code",
"execution_count": 31,
"id": "b3d7bd21-ab5f-4119-8484-d816725cec3b",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"669959"
]
},
"execution_count": 31,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"startTrips.count()"
]
},
{
"cell_type": "code",
"execution_count": 32,
"id": "cef28792-2d71-49ef-b390-6d6e9ce7320d",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"669959"
]
},
"execution_count": 32,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"endTrips.count()"
]
},
{
"cell_type": "markdown",
"id": "14c6b808-f130-4265-bbdc-4c02a7fc32ce",
"metadata": {},
"source": [
"Если вы знаете распределение ключей заранее, вы можете выбрать оптимальный способ хеширования ключей по разделам `Partition`. Например, если один ключ встречается на порядки чаще, чем другие ключи, то использование `HashPartitioner` будет не лучшим выбором, так как данные связанные с этим ключом будут собираться в одном разделе. Это приведёт к неравномерной нагрузке на вычислительные ресурсы.\n",
"\n",
"Выбрать определённую реализацию класса распределения по разделам можно с помощью функции RDD `partitionBy`. Например, для RDD `stationsIndexed` выбирается `portable_hash(idx)` с количеством разделов равным количеству разделов trips RDD."
]
},
{
"cell_type": "code",
"execution_count": 55,
"id": "2a17ea3a-f109-40e6-8959-48beed7ca672",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"MapPartitionsRDD[39] at mapPartitions at PythonRDD.scala:145"
]
},
"execution_count": 55,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from pyspark.rdd import portable_hash\n",
"\n",
"stationsIndexed.partitionBy(numPartitions=trips.getNumPartitions(), partitionFunc=lambda x: portable_hash(x[0]))"
]
},
{
"cell_type": "markdown",
"id": "bfb7f922-ac6b-41f4-a1d6-ecd43b60a3ab",
"metadata": {},
"source": [
"Узнать какой класс назначен для текущего RDD можно обращением к полю partitioner."
]
},
{
"cell_type": "code",
"execution_count": 58,
"id": "8d4353d6-5db9-4e9f-9d3a-20f85c37fcf9",
"metadata": {},
"outputs": [],
"source": [
"stationsIndexed.partitioner"
]
},
{
"cell_type": "markdown",
"id": "797aa041-b72e-4b6a-8992-ee7f1a715096",
"metadata": {},
"source": [
"## Создание модели данных\n",
"\n",
"Для более эффективной обработки и получения дополнительных возможностей мы можем объявить классы сущностей предметной области и преобразовать исходные строковые данные в объявленное представление."
]
},
{
"cell_type": "code",
"execution_count": 134,
"id": "84aa084c-8e7a-427b-9bd9-1b7e106c1ea2",
"metadata": {},
"outputs": [],
"source": [
"from typing import NamedTuple\n",
"from datetime import datetime"
]
},
{
"cell_type": "code",
"execution_count": 63,
"id": "d92a6f7e-b57a-4f79-836d-07d5e3286012",
"metadata": {},
"outputs": [],
"source": [
"def initStation(stations):\n",
" class Station(NamedTuple):\n",
" station_id: int\n",
" name: str\n",
" lat: float\n",
" long: float\n",
" dockcount: int\n",
" landmark: str\n",
" installation: str\n",
" \n",
" for station in stations:\n",
" yield Station(\n",
" station_id = int(station[0]),\n",
" name = station[1],\n",
" lat = float(station[2]),\n",
" long = float(station[3]),\n",
" dockcount = int(station[4]),\n",
" landmark = station[5],\n",
" installation = datetime.strptime(station[6], '%m/%d/%Y')\n",
" )"
]
},
{
"cell_type": "code",
"execution_count": 71,
"id": "a09036d1-2ba1-4184-86af-41ea7505a36b",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"Station(station_id=2, name='San Jose Diridon Caltrain Station', lat=37.329732, long=-121.90178200000001, dockcount=27, landmark='San Jose', installation=datetime.datetime(2013, 8, 6, 0, 0))"
]
},
"execution_count": 71,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"stationsInternal = stations.mapPartitions(initStation)\n",
"stationsInternal.first()"
]
},
{
"cell_type": "code",
"execution_count": 150,
"id": "9f728f0d-86ee-4d9e-9292-31ca3c1fae35",
"metadata": {},
"outputs": [],
"source": [
"def initTrip(trips):\n",
" class Trip(NamedTuple):\n",
" trip_id: int\n",
" duration: int\n",
" start_date: datetime\n",
" start_station_name: str\n",
" start_station_id: int\n",
" end_date: datetime\n",
" end_station_name: str\n",
" end_station_id: int\n",
" bike_id: int\n",
" subscription_type: str\n",
" zip_code: str\n",
" \n",
" for trip in trips:\n",
" try:\n",
" yield Trip( \n",
" trip_id = int(trip[0]),\n",
" duration = int(trip[1]),\n",
" start_date = datetime.strptime(trip[2], '%m/%d/%Y %H:%M'),\n",
" start_station_name = trip[3],\n",
" start_station_id = int(trip[4]),\n",
" end_date = datetime.strptime(trip[5], '%m/%d/%Y %H:%M'),\n",
" end_station_name = trip[6],\n",
" end_station_id = trip[7],\n",
" bike_id = int(trip[8]),\n",
" subscription_type = trip[9],\n",
" zip_code = trip[10]\n",
" ) \n",
" except:\n",
" pass"
]
},
{
"cell_type": "code",
"execution_count": 151,
"id": "c265426b-f20b-426e-aeba-7295bc797f3b",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[Trip(trip_id=4607, duration=70, start_date=datetime.datetime(2013, 8, 29, 14, 42), start_station_name='San Jose City Hall', start_station_id=10, end_date=datetime.datetime(2013, 8, 29, 14, 43), end_station_name='San Jose City Hall', end_station_id='10', bike_id=661, subscription_type='Subscriber', zip_code='95138'),\n",
" Trip(trip_id=4130, duration=71, start_date=datetime.datetime(2013, 8, 29, 10, 16), start_station_name='Mountain View City Hall', start_station_id=27, end_date=datetime.datetime(2013, 8, 29, 10, 17), end_station_name='Mountain View City Hall', end_station_id='27', bike_id=48, subscription_type='Subscriber', zip_code='97214'),\n",
" Trip(trip_id=4251, duration=77, start_date=datetime.datetime(2013, 8, 29, 11, 29), start_station_name='San Jose City Hall', start_station_id=10, end_date=datetime.datetime(2013, 8, 29, 11, 30), end_station_name='San Jose City Hall', end_station_id='10', bike_id=26, subscription_type='Subscriber', zip_code='95060'),\n",
" Trip(trip_id=4299, duration=83, start_date=datetime.datetime(2013, 8, 29, 12, 2), start_station_name='South Van Ness at Market', start_station_id=66, end_date=datetime.datetime(2013, 8, 29, 12, 4), end_station_name='Market at 10th', end_station_id='67', bike_id=319, subscription_type='Subscriber', zip_code='94103'),\n",
" Trip(trip_id=4927, duration=103, start_date=datetime.datetime(2013, 8, 29, 18, 54), start_station_name='Golden Gate at Polk', start_station_id=59, end_date=datetime.datetime(2013, 8, 29, 18, 56), end_station_name='Golden Gate at Polk', end_station_id='59', bike_id=527, subscription_type='Subscriber', zip_code='94109'),\n",
" Trip(trip_id=4500, duration=109, start_date=datetime.datetime(2013, 8, 29, 13, 25), start_station_name='Santa Clara at Almaden', start_station_id=4, end_date=datetime.datetime(2013, 8, 29, 13, 27), end_station_name='Adobe on Almaden', end_station_id='5', bike_id=679, subscription_type='Subscriber', zip_code='95112'),\n",
" Trip(trip_id=4563, duration=111, start_date=datetime.datetime(2013, 8, 29, 14, 2), start_station_name='San Salvador at 1st', start_station_id=8, end_date=datetime.datetime(2013, 8, 29, 14, 4), end_station_name='San Salvador at 1st', end_station_id='8', bike_id=687, subscription_type='Subscriber', zip_code='95112'),\n",
" Trip(trip_id=4760, duration=113, start_date=datetime.datetime(2013, 8, 29, 17, 1), start_station_name='South Van Ness at Market', start_station_id=66, end_date=datetime.datetime(2013, 8, 29, 17, 3), end_station_name='South Van Ness at Market', end_station_id='66', bike_id=553, subscription_type='Subscriber', zip_code='94103'),\n",
" Trip(trip_id=4258, duration=114, start_date=datetime.datetime(2013, 8, 29, 11, 33), start_station_name='San Jose City Hall', start_station_id=10, end_date=datetime.datetime(2013, 8, 29, 11, 35), end_station_name='MLK Library', end_station_id='11', bike_id=107, subscription_type='Subscriber', zip_code='95060'),\n",
" Trip(trip_id=4549, duration=125, start_date=datetime.datetime(2013, 8, 29, 13, 52), start_station_name='Spear at Folsom', start_station_id=49, end_date=datetime.datetime(2013, 8, 29, 13, 55), end_station_name='Embarcadero at Bryant', end_station_id='54', bike_id=368, subscription_type='Subscriber', zip_code='94109')]"
]
},
"execution_count": 151,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"tripsInternal = trips.mapPartitions(initTrip)\n",
"tripsInternal.take(10)"
]
},
{
"cell_type": "markdown",
"id": "111e1fba-9ffe-4a55-81d7-021041f09865",
"metadata": {},
"source": [
"Для каждой стартовой станции найдем среднее время поездки. Будем использовать метод groupByKey.\n",
"\n",
"Для этого потребуется преобразовать trips RDD в RDD коллекцию пар ключ-значение аналогично тому, как мы совершали это ранее методом keyBy."
]
},
{
"cell_type": "code",
"execution_count": 82,
"id": "5f5cf9b5-b470-4042-8c46-15d1afa22527",
"metadata": {},
"outputs": [],
"source": [
"tripsByStartStation = tripsInternal.keyBy(lambda trip: trip.start_station_name)"
]
},
{
"cell_type": "markdown",
"id": "efcfc45a-465f-4e58-a55a-98069804d13f",
"metadata": {},
"source": [
"Рассчитаем среднее время поездки для каждого стартового парковочного места"
]
},
{
"cell_type": "code",
"execution_count": 85,
"id": "106a733b-540b-4830-a781-b7bd91fe8785",
"metadata": {},
"outputs": [],
"source": [
"import numpy as np\n",
"\n",
"avgDurationByStartStation = tripsByStartStation\\\n",
" .mapValues(lambda trip: trip.duration)\\\n",
" .groupByKey()\\\n",
" .mapValues(lambda trip_durations: np.mean(list(trip_durations)))"
]
},
{
"cell_type": "markdown",
"id": "b8facf2f-77f1-45b2-bf12-b57b95886bfc",
"metadata": {},
"source": [
"Выведем первые 10 результатов"
]
},
{
"cell_type": "code",
"execution_count": 94,
"id": "3b8c0ad3-e939-4d6f-916c-1b558e50d17f",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 6.79 ms, sys: 5.3 ms, total: 12.1 ms\n",
"Wall time: 127 ms\n"
]
},
{
"data": {
"text/plain": [
"[('University and Emerson', 7090.239417989418),\n",
" ('California Ave Caltrain Station', 4628.005847953216),\n",
" ('Redwood City Public Library', 4579.234741784037),\n",
" ('Park at Olive', 4438.1613333333335),\n",
" ('San Jose Civic Center', 4208.016938519448),\n",
" ('Rengstorff Avenue / California Street', 4174.082373782108),\n",
" ('Redwood City Medical Center', 3959.491961414791),\n",
" ('Palo Alto Caltrain Station', 3210.6489815253435),\n",
" ('San Mateo County Center', 2716.7700348432054),\n",
" ('Broadway at Main', 2481.2537313432836)]"
]
},
"execution_count": 94,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%%time\n",
"\n",
"avgDurationByStartStation.top(10, key=lambda x: x[1])"
]
},
{
"cell_type": "markdown",
"id": "8acf13df-5033-4397-be08-5524b9d01c50",
"metadata": {},
"source": [
"Выполнение операции groupByKey приводит к интенсивным передачам данных. Если группировка делается для последующей редукции элементов лучше использовать трансформацию reduceByKey или aggregateByKey. Их выполнение приведёт сначала к локальной редукции над разделом Partition, а затем будет произведено окончательное суммирование над полученными частичными суммами.\n",
"\n",
"*Примечание.* Выполнение reduceByKey логически сходно с выполнением Combine и Reduce фазы MapReduce работы.\n",
"\n",
"Функция aggregateByKey является аналогом reduceByKey с возможностью указывать начальный элемент.\n",
"\n",
"Рассчитаем среднее значение с помощью aggregateByKey. Одновременно будут вычисляться два значения для каждого стартового терминала: сумма времён и количество поездок."
]
},
{
"cell_type": "code",
"execution_count": 88,
"id": "53c746c5-ee60-42b9-a1f3-465bbbdaa02b",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"\u001b[0;31mSignature:\u001b[0m\n",
" \u001b[0mtripsByStartStation\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0maggregateByKey\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\u001b[0m\n",
"\u001b[0;34m\u001b[0m \u001b[0mzeroValue\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\n",
"\u001b[0;34m\u001b[0m \u001b[0mseqFunc\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\n",
"\u001b[0;34m\u001b[0m \u001b[0mcombFunc\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\n",
"\u001b[0;34m\u001b[0m \u001b[0mnumPartitions\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;32mNone\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\n",
"\u001b[0;34m\u001b[0m \u001b[0mpartitionFunc\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;34m<\u001b[0m\u001b[0mfunction\u001b[0m \u001b[0mportable_hash\u001b[0m \u001b[0mat\u001b[0m \u001b[0;36m0x7fb9a437c310\u001b[0m\u001b[0;34m>\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\n",
"\u001b[0;34m\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;31mDocstring:\u001b[0m\n",
"Aggregate the values of each key, using given combine functions and a neutral\n",
"\"zero value\". This function can return a different result type, U, than the type\n",
"of the values in this RDD, V. Thus, we need one operation for merging a V into\n",
"a U and one operation for merging two U's, The former operation is used for merging\n",
"values within a partition, and the latter is used for merging values between\n",
"partitions. To avoid memory allocation, both of these functions are\n",
"allowed to modify and return their first argument instead of creating a new U.\n",
"\u001b[0;31mFile:\u001b[0m ~/.local/lib/python3.9/site-packages/pyspark/rdd.py\n",
"\u001b[0;31mType:\u001b[0m method\n"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"? tripsByStartStation.aggregateByKey"
]
},
{
"cell_type": "code",
"execution_count": 90,
"id": "fcdafcfb-ed3c-4c18-9051-44cf3632625c",
"metadata": {},
"outputs": [],
"source": [
"def seqFunc(acc, duration):\n",
" duration_sum, count = acc\n",
" return (duration_sum + duration, count + 1)\n",
"\n",
"def combFunc(acc1, acc2):\n",
" duration_sum1, count1 = acc1\n",
" duration_sum2, count2 = acc2\n",
" return (duration_sum1+duration_sum2, count1+count2)\n",
"\n",
"def meanFunc(acc):\n",
" duration_sum, count = acc\n",
" return duration_sum/count\n",
"\n",
"avgDurationByStartStation2 = tripsByStartStation\\\n",
" .mapValues(lambda trip: trip.duration)\\\n",
" .aggregateByKey(\n",
" zeroValue=(0,0),\n",
" seqFunc=seqFunc,\n",
" combFunc=combFunc)\\\n",
" .mapValues(meanFunc)"
]
},
{
"cell_type": "markdown",
"id": "71665ff6-6ce3-48b1-a183-1d02b9c2e80e",
"metadata": {},
"source": [
"В `zeroValue` передаётся начальное значение. В нашем случае это пара нулей. Первая функция `seqFunc` предназначена для прохода по коллекции партиции. На этом проходе значение элементов помещаются средой в переменную duration, а переменная «аккумулятора» acc накапливает значения. Вторая функция `combFunc` предназначена для этапа редукции частично посчитанных локальных результатов.\n",
"\n",
"Сравните результаты `avgDurationByStartStation` и `avgDurationByStartStation2` и их время выполнения."
]
},
{
"cell_type": "code",
"execution_count": 95,
"id": "c69d5013-f26e-4d6b-8fb6-277390a0f267",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 8.47 ms, sys: 3.23 ms, total: 11.7 ms\n",
"Wall time: 65.6 ms\n"
]
},
{
"data": {
"text/plain": [
"[('University and Emerson', 7090.239417989418),\n",
" ('California Ave Caltrain Station', 4628.005847953216),\n",
" ('Redwood City Public Library', 4579.234741784037),\n",
" ('Park at Olive', 4438.1613333333335),\n",
" ('San Jose Civic Center', 4208.016938519448),\n",
" ('Rengstorff Avenue / California Street', 4174.082373782108),\n",
" ('Redwood City Medical Center', 3959.491961414791),\n",
" ('Palo Alto Caltrain Station', 3210.6489815253435),\n",
" ('San Mateo County Center', 2716.7700348432054),\n",
" ('Broadway at Main', 2481.2537313432836)]"
]
},
"execution_count": 95,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%%time\n",
"\n",
"avgDurationByStartStation2.top(10, key=lambda x: x[1])"
]
},
{
"cell_type": "markdown",
"id": "39ce42ac-d6ed-441d-9236-76fce6b35136",
"metadata": {},
"source": [
"Теперь найдём первую поездку для каждой велостоянки. Для решения опять потребуется группировка. Ещё одним недостатком `groupByKey` данных является то, что для группировки данные должны поместиться в оперативной памяти. Это может привести к ошибке `OutOfMemoryException` для больших объёмов данных.\n",
"\n",
"Найдем самую раннюю поездку для каждой станции. Сгруппируем поездки по станциям, возьмём первую поездку из отсортированного списка:"
]
},
{
"cell_type": "code",
"execution_count": 115,
"id": "2f37ab32-5e2c-4596-ba09-9d6283e9fd6b",
"metadata": {},
"outputs": [],
"source": [
"def earliestTrip(trips):\n",
" if trips is None:\n",
" return None\n",
" if len(trips)==0:\n",
" return trips\n",
" trips = list(trips)\n",
" min_date = trips[0].start_date\n",
" min_trip = trips[0]\n",
" for trip in trips[1:]:\n",
" if min_date > trip.start_date:\n",
" min_date = trip.start_date\n",
" min_trip = trip\n",
" return min_trip\n",
"\n",
"firstGrouped = tripsByStartStation\\\n",
" .groupByKey()\\\n",
" .mapValues(lambda trips: earliestTrip(trips))"
]
},
{
"cell_type": "code",
"execution_count": 116,
"id": "7f74e315-c036-4992-9111-dff2d4150183",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 3.15 ms, sys: 15.3 ms, total: 18.4 ms\n",
"Wall time: 25.3 s\n"
]
},
{
"data": {
"text/plain": [
"[('Market at 4th',\n",
" Trip(trip_id=4134, duration=574, start_date=datetime.datetime(2013, 8, 29, 10, 19), start_station_name='Market at 4th', start_station_id=76, end_date=datetime.datetime(2013, 8, 29, 10, 29), end_station_name='2nd at South Park', end_station_id='64', bike_id=426, subscription_type='Customer', zip_code='94117')),\n",
" ('Market at Sansome',\n",
" Trip(trip_id=4321, duration=505, start_date=datetime.datetime(2013, 8, 29, 12, 10), start_station_name='Market at Sansome', start_station_id=77, end_date=datetime.datetime(2013, 8, 29, 12, 19), end_station_name='Harry Bridges Plaza (Ferry Building)', end_station_id='50', bike_id=625, subscription_type='Subscriber', zip_code='94110')),\n",
" ('San Jose Diridon Caltrain Station',\n",
" Trip(trip_id=4547, duration=1580, start_date=datetime.datetime(2013, 8, 29, 13, 52), start_station_name='San Jose Diridon Caltrain Station', start_station_id=2, end_date=datetime.datetime(2013, 8, 29, 14, 18), end_station_name='San Jose City Hall', end_station_id='10', bike_id=107, subscription_type='Customer', zip_code='94306')),\n",
" ('Howard at 2nd',\n",
" Trip(trip_id=4524, duration=579, start_date=datetime.datetime(2013, 8, 29, 13, 39), start_station_name='Howard at 2nd', start_station_id=63, end_date=datetime.datetime(2013, 8, 29, 13, 48), end_station_name='San Francisco Caltrain (Townsend at 4th)', end_station_id='70', bike_id=358, subscription_type='Subscriber', zip_code='94117')),\n",
" ('San Francisco City Hall',\n",
" Trip(trip_id=4265, duration=151, start_date=datetime.datetime(2013, 8, 29, 11, 40), start_station_name='San Francisco City Hall', start_station_id=58, end_date=datetime.datetime(2013, 8, 29, 11, 42), end_station_name='San Francisco City Hall', end_station_id='58', bike_id=520, subscription_type='Subscriber', zip_code='94110'))]"
]
},
"execution_count": 116,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%%time\n",
"\n",
"firstGrouped.take(5)"
]
},
{
"cell_type": "markdown",
"id": "c6020067-3a75-47e8-9bb1-9e36cded29a7",
"metadata": {},
"source": [
"Лучшим вариантом с точки зрения эффективности будет использование трансформации `reduceByKey`"
]
},
{
"cell_type": "code",
"execution_count": 117,
"id": "750007e2-2706-4997-9b35-a44269b55200",
"metadata": {},
"outputs": [],
"source": [
"firstGrouped = tripsByStartStation\\\n",
" .reduceByKey(lambda tripA, tripB: tripA if tripA.start_date < tripB.start_date else tripB)"
]
},
{
"cell_type": "code",
"execution_count": 118,
"id": "b98199bd-26dd-4112-87e4-e9505338bd3c",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 15.5 ms, sys: 2.89 ms, total: 18.4 ms\n",
"Wall time: 16 s\n"
]
},
{
"data": {
"text/plain": [
"[('Market at 4th',\n",
" Trip(trip_id=4134, duration=574, start_date=datetime.datetime(2013, 8, 29, 10, 19), start_station_name='Market at 4th', start_station_id=76, end_date=datetime.datetime(2013, 8, 29, 10, 29), end_station_name='2nd at South Park', end_station_id='64', bike_id=426, subscription_type='Customer', zip_code='94117')),\n",
" ('Market at Sansome',\n",
" Trip(trip_id=4320, duration=520, start_date=datetime.datetime(2013, 8, 29, 12, 10), start_station_name='Market at Sansome', start_station_id=77, end_date=datetime.datetime(2013, 8, 29, 12, 19), end_station_name='Harry Bridges Plaza (Ferry Building)', end_station_id='50', bike_id=616, subscription_type='Subscriber', zip_code='94109')),\n",
" ('San Jose Diridon Caltrain Station',\n",
" Trip(trip_id=4547, duration=1580, start_date=datetime.datetime(2013, 8, 29, 13, 52), start_station_name='San Jose Diridon Caltrain Station', start_station_id=2, end_date=datetime.datetime(2013, 8, 29, 14, 18), end_station_name='San Jose City Hall', end_station_id='10', bike_id=107, subscription_type='Customer', zip_code='94306')),\n",
" ('Howard at 2nd',\n",
" Trip(trip_id=4525, duration=650, start_date=datetime.datetime(2013, 8, 29, 13, 39), start_station_name='Howard at 2nd', start_station_id=63, end_date=datetime.datetime(2013, 8, 29, 13, 50), end_station_name='San Francisco Caltrain (Townsend at 4th)', end_station_id='70', bike_id=359, subscription_type='Subscriber', zip_code='94401')),\n",
" ('San Francisco City Hall',\n",
" Trip(trip_id=4265, duration=151, start_date=datetime.datetime(2013, 8, 29, 11, 40), start_station_name='San Francisco City Hall', start_station_id=58, end_date=datetime.datetime(2013, 8, 29, 11, 42), end_station_name='San Francisco City Hall', end_station_id='58', bike_id=520, subscription_type='Subscriber', zip_code='94110'))]"
]
},
"execution_count": 118,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%%time\n",
"\n",
"firstGrouped.take(5)"
]
},
{
"cell_type": "code",
"execution_count": 152,
"id": "c98261f7-283c-4c7e-b915-3778ff972f5f",
"metadata": {},
"outputs": [],
"source": [
"sc.stop()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e8bc4000-9d08-408a-b037-fbd9796ed459",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.5"
}
},
"nbformat": 4,
"nbformat_minor": 5
}