{ "nbformat": 4, "nbformat_minor": 0, "metadata": { "colab": { "provenance": [] }, "kernelspec": { "name": "python3", "display_name": "Python 3" } }, "cells": [ { "cell_type": "markdown", "metadata": { "id": "82OvPKEiEqjc" }, "source": [ "# Введение в MapReduce модель на Python\n" ] }, { "cell_type": "code", "metadata": { "id": "JQ2cvXLjICmI" }, "source": [ "from typing import NamedTuple # requires python 3.6+\n", "from typing import Iterator" ], "execution_count": null, "outputs": [] }, { "cell_type": "code", "metadata": { "id": "yjPHumVwEyEg" }, "source": [ "def MAP(_, row:NamedTuple):\n", " if (row.gender == 'female'):\n", " yield (row.age, row)\n", " \n", "def REDUCE(age:str, rows:Iterator[NamedTuple]):\n", " sum = 0\n", " count = 0\n", " for row in rows:\n", " sum += row.social_contacts\n", " count += 1\n", " if (count > 0):\n", " yield (age, sum/count)\n", " else:\n", " yield (age, 0)" ], "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "metadata": { "id": "vBKMgpG_ilaZ" }, "source": [ "Модель элемента данных" ] }, { "cell_type": "code", "metadata": { "id": "Rv-XIjhTJPx3" }, "source": [ "class User(NamedTuple):\n", " id: int\n", " age: str\n", " social_contacts: int\n", " gender: str" ], "execution_count": null, "outputs": [] }, { "cell_type": "code", "metadata": { "id": "5KV0Ze2vQgu5" }, "source": [ "input_collection = [\n", " User(id=0, age=55, gender='male', social_contacts=20),\n", " User(id=1, age=25, gender='female', social_contacts=240),\n", " User(id=2, age=25, gender='female', social_contacts=500),\n", " User(id=3, age=33, gender='female', social_contacts=800)\n", "]" ], "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "metadata": { "id": "YFeqzyZxZIFZ" }, "source": [ "Функция RECORDREADER моделирует чтение элементов с диска или по сети." ] }, { "cell_type": "code", "metadata": { "id": "S5HR4E_GQoMJ" }, "source": [ "def RECORDREADER():\n", " return [(u.id, u) for u in input_collection]" ], "execution_count": null, "outputs": [] }, { "cell_type": "code", "metadata": { "id": "NeEoWla-ROUy", "colab": { "base_uri": "https://localhost:8080/", "height": 85 }, "outputId": "94ca6e0e-4644-4282-acbf-1759d7ba2918" }, "source": [ "list(RECORDREADER())" ], "execution_count": null, "outputs": [ { "output_type": "execute_result", "data": { "text/plain": [ "[(0, User(id=0, age=55, social_contacts=20, gender='male')),\n", " (1, User(id=1, age=25, social_contacts=240, gender='female')),\n", " (2, User(id=2, age=25, social_contacts=500, gender='female')),\n", " (3, User(id=3, age=33, social_contacts=800, gender='female'))]" ] }, "metadata": { "tags": [] }, "execution_count": 20 } ] }, { "cell_type": "code", "metadata": { "id": "YB8orgPSZs8M" }, "source": [ "def flatten(nested_iterable):\n", " for iterable in nested_iterable:\n", " for element in iterable:\n", " yield element" ], "execution_count": null, "outputs": [] }, { "cell_type": "code", "metadata": { "id": "74oyvDLaRmd5", "colab": { "base_uri": "https://localhost:8080/", "height": 68 }, "outputId": "c6147702-7153-47c7-a574-d5fe6abe29a8" }, "source": [ "map_output = flatten(map(lambda x: MAP(*x), RECORDREADER()))\n", "map_output = list(map_output) # materialize\n", "map_output" ], "execution_count": null, "outputs": [ { "output_type": "execute_result", "data": { "text/plain": [ "[(25, User(id=1, age=25, social_contacts=240, gender='female')),\n", " (25, User(id=2, age=25, social_contacts=500, gender='female')),\n", " (33, User(id=3, age=33, social_contacts=800, gender='female'))]" ] }, "metadata": { "tags": [] }, "execution_count": 22 } ] }, { "cell_type": "code", "metadata": { "id": "8ncYDJ3-VzDn" }, "source": [ "def groupbykey(iterable):\n", " t = {}\n", " for (k2, v2) in iterable:\n", " t[k2] = t.get(k2, []) + [v2]\n", " return t.items()" ], "execution_count": null, "outputs": [] }, { "cell_type": "code", "metadata": { "id": "cKzY_6COWOA2", "colab": { "base_uri": "https://localhost:8080/", "height": 85 }, "outputId": "e6791b12-e409-47e9-bcd4-e9f8ca8611bd" }, "source": [ "shuffle_output = groupbykey(map_output)\n", "shuffle_output = list(shuffle_output)\n", "shuffle_output" ], "execution_count": null, "outputs": [ { "output_type": "execute_result", "data": { "text/plain": [ "[(25,\n", " [User(id=1, age=25, social_contacts=240, gender='female'),\n", " User(id=2, age=25, social_contacts=500, gender='female')]),\n", " (33, [User(id=3, age=33, social_contacts=800, gender='female')])]" ] }, "metadata": { "tags": [] }, "execution_count": 24 } ] }, { "cell_type": "code", "metadata": { "id": "NlA7lkDDYL0t", "colab": { "base_uri": "https://localhost:8080/", "height": 34 }, "outputId": "6b25d03f-5c92-4f3b-f500-6d70acd598b7" }, "source": [ "reduce_output = flatten(map(lambda x: REDUCE(*x), shuffle_output))\n", "reduce_output = list(reduce_output)\n", "reduce_output" ], "execution_count": null, "outputs": [ { "output_type": "execute_result", "data": { "text/plain": [ "[(25, 370.0), (33, 800.0)]" ] }, "metadata": { "tags": [] }, "execution_count": 25 } ] }, { "cell_type": "markdown", "metadata": { "id": "xf6qhHEtd6bI" }, "source": [ "Все действия одним конвейером!" ] }, { "cell_type": "code", "metadata": { "id": "dZaQGYxCdpw5", "colab": { "base_uri": "https://localhost:8080/", "height": 34 }, "outputId": "3f5c6425-e5c5-49d2-b2cd-ce58a9acc33c" }, "source": [ "list(flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER()))))))" ], "execution_count": null, "outputs": [ { "output_type": "execute_result", "data": { "text/plain": [ "[(25, 370.0), (33, 800.0)]" ] }, "metadata": { "tags": [] }, "execution_count": 26 } ] }, { "cell_type": "markdown", "metadata": { "id": "Vq3EWRIpwSiJ" }, "source": [ "# **MapReduce**\n", "Выделим общую для всех пользователей часть системы в отдельную функцию высшего порядка. Это наиболее простая модель MapReduce, без учёта распределённого хранения данных. \n", "\n", "Пользователь для решения своей задачи реализует RECORDREADER, MAP, REDUCE." ] }, { "cell_type": "code", "metadata": { "id": "V1PZeQMwwVjc" }, "source": [ "def flatten(nested_iterable):\n", " for iterable in nested_iterable:\n", " for element in iterable:\n", " yield element\n", "\n", "def groupbykey(iterable):\n", " t = {}\n", " for (k2, v2) in iterable:\n", " t[k2] = t.get(k2, []) + [v2]\n", " return t.items()\n", "\n", "def MapReduce(RECORDREADER, MAP, REDUCE):\n", " return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))" ], "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "metadata": { "id": "iFIVrimep678" }, "source": [ "## Спецификация MapReduce\n", "\n", "\n", "\n", "```\n", "f (k1, v1) -> (k2,v2)*\n", "g (k2, v2*) -> (k3,v3)*\n", " \n", "mapreduce ((k1,v1)*) -> (k3,v3)*\n", "groupby ((k2,v2)*) -> (k2,v2*)*\n", "flatten (e2**) -> e2*\n", " \n", "mapreduce .map(f).flatten.groupby(k2).map(g).flatten\n", "```\n", "\n", "\n" ] }, { "cell_type": "markdown", "metadata": { "id": "QtTFyqke3KGe" }, "source": [ "# Примеры" ] }, { "cell_type": "markdown", "metadata": { "id": "BNhh5763w5Vn" }, "source": [ "## SQL " ] }, { "cell_type": "code", "metadata": { "id": "QkyurnvGxBGk", "colab": { "base_uri": "https://localhost:8080/", "height": 34 }, "outputId": "84761282-d2ba-435a-e8d7-a85150730e10" }, "source": [ "from typing import NamedTuple # requires python 3.6+\n", "from typing import Iterator\n", "\n", "class User(NamedTuple):\n", " id: int\n", " age: str\n", " social_contacts: int\n", " gender: str\n", " \n", "input_collection = [\n", " User(id=0, age=55, gender='male', social_contacts=20),\n", " User(id=1, age=25, gender='female', social_contacts=240),\n", " User(id=2, age=25, gender='female', social_contacts=500),\n", " User(id=3, age=33, gender='female', social_contacts=800)\n", "]\n", "\n", "def MAP(_, row:NamedTuple):\n", " if (row.gender == 'female'):\n", " yield (row.age, row)\n", " \n", "def REDUCE(age:str, rows:Iterator[NamedTuple]):\n", " sum = 0\n", " count = 0\n", " for row in rows:\n", " sum += row.social_contacts\n", " count += 1\n", " if (count > 0):\n", " yield (age, sum/count)\n", " else:\n", " yield (age, 0)\n", " \n", "def RECORDREADER():\n", " return [(u.id, u) for u in input_collection]\n", "\n", "output = MapReduce(RECORDREADER, MAP, REDUCE)\n", "output = list(output)\n", "output" ], "execution_count": null, "outputs": [ { "output_type": "execute_result", "data": { "text/plain": [ "[(25, 370.0), (33, 800.0)]" ] }, "metadata": { "tags": [] }, "execution_count": 28 } ] }, { "cell_type": "markdown", "metadata": { "id": "kNKYIeerx0nY" }, "source": [ "## Matrix-Vector multiplication " ] }, { "cell_type": "code", "metadata": { "id": "rwcntRcCyi1V", "colab": { "base_uri": "https://localhost:8080/", "height": 102 }, "outputId": "606737ab-6b55-455c-931f-4fc45155f8a9" }, "source": [ "from typing import Iterator\n", "import numpy as np\n", "\n", "mat = np.ones((5,4))\n", "vec = np.random.rand(4) # in-memory vector in all map tasks\n", "\n", "def MAP(coordinates:(int, int), value:int):\n", " i, j = coordinates\n", " yield (i, value*vec[j])\n", " \n", "def REDUCE(i:int, products:Iterator[NamedTuple]):\n", " sum = 0\n", " for p in products:\n", " sum += p\n", " yield (i, sum)\n", "\n", "def RECORDREADER():\n", " for i in range(mat.shape[0]):\n", " for j in range(mat.shape[1]):\n", " yield ((i, j), mat[i,j])\n", " \n", "output = MapReduce(RECORDREADER, MAP, REDUCE)\n", "output = list(output)\n", "output" ], "execution_count": null, "outputs": [ { "output_type": "execute_result", "data": { "text/plain": [ "[(0, 2.905589809636405),\n", " (1, 2.905589809636405),\n", " (2, 2.905589809636405),\n", " (3, 2.905589809636405),\n", " (4, 2.905589809636405)]" ] }, "metadata": { "tags": [] }, "execution_count": 29 } ] }, { "cell_type": "markdown", "metadata": { "id": "ruZREYdi2o4O" }, "source": [ "## Inverted index " ] }, { "cell_type": "code", "metadata": { "id": "vt9H9Alf3TYv", "colab": { "base_uri": "https://localhost:8080/", "height": 102 }, "outputId": "51aeffc9-e111-4607-bd84-cfcc7b56f238" }, "source": [ "from typing import Iterator\n", "\n", "d1 = \"it is what it is\"\n", "d2 = \"what is it\"\n", "d3 = \"it is a banana\"\n", "documents = [d1, d2, d3]\n", "\n", "def RECORDREADER():\n", " for (docid, document) in enumerate(documents):\n", " yield (\"{}\".format(docid), document)\n", " \n", "def MAP(docId:str, body:str):\n", " for word in set(body.split(' ')):\n", " yield (word, docId)\n", " \n", "def REDUCE(word:str, docIds:Iterator[str]):\n", " yield (word, sorted(docIds))\n", "\n", "output = MapReduce(RECORDREADER, MAP, REDUCE)\n", "output = list(output)\n", "output" ], "execution_count": null, "outputs": [ { "output_type": "execute_result", "data": { "text/plain": [ "[('what', ['0', '1']),\n", " ('is', ['0', '1', '2']),\n", " ('it', ['0', '1', '2']),\n", " ('a', ['2']),\n", " ('banana', ['2'])]" ] }, "metadata": { "tags": [] }, "execution_count": 30 } ] }, { "cell_type": "markdown", "metadata": { "id": "R7az-6DA6qr2" }, "source": [ "## WordCount" ] }, { "cell_type": "code", "metadata": { "id": "dN-nbtgG6uYG", "colab": { "base_uri": "https://localhost:8080/", "height": 34 }, "outputId": "24117576-7931-401d-a581-28e246b23453" }, "source": [ "from typing import Iterator\n", "\n", "d1 = \"\"\"\n", "it is what it is\n", "it is what it is\n", "it is what it is\"\"\"\n", "d2 = \"\"\"\n", "what is it\n", "what is it\"\"\"\n", "d3 = \"\"\"\n", "it is a banana\"\"\"\n", "documents = [d1, d2, d3]\n", "\n", "def RECORDREADER():\n", " for (docid, document) in enumerate(documents):\n", " for (lineid, line) in enumerate(document.split('\\n')):\n", " yield (\"{}:{}\".format(docid,lineid), line)\n", "\n", "def MAP(docId:str, line:str):\n", " for word in line.split(\" \"): \n", " yield (word, 1)\n", " \n", "def REDUCE(word:str, counts:Iterator[int]):\n", " sum = 0\n", " for c in counts:\n", " sum += c\n", " yield (word, sum)\n", "\n", "output = MapReduce(RECORDREADER, MAP, REDUCE)\n", "output = list(output)\n", "output" ], "execution_count": null, "outputs": [ { "output_type": "execute_result", "data": { "text/plain": [ "[('', 3), ('it', 9), ('is', 9), ('what', 5), ('a', 1), ('banana', 1)]" ] }, "metadata": { "tags": [] }, "execution_count": 31 } ] }, { "cell_type": "markdown", "metadata": { "id": "h-jRAcYCAkkk" }, "source": [ "# MapReduce Distributed\n", "\n", "Добавляется в модель фабрика RECORDREARER-ов --- INPUTFORMAT, функция распределения промежуточных результатов по партициям PARTITIONER, и функция COMBINER для частичной аггрегации промежуточных результатов до распределения по новым партициям." ] }, { "cell_type": "code", "metadata": { "id": "nw-b-xJsApgW" }, "source": [ "def flatten(nested_iterable):\n", " for iterable in nested_iterable:\n", " for element in iterable:\n", " yield element\n", "\n", "def groupbykey(iterable):\n", " t = {}\n", " for (k2, v2) in iterable:\n", " t[k2] = t.get(k2, []) + [v2]\n", " return t.items()\n", " \n", "def groupbykey_distributed(map_partitions, PARTITIONER):\n", " global reducers\n", " partitions = [dict() for _ in range(reducers)]\n", " for map_partition in map_partitions:\n", " for (k2, v2) in map_partition:\n", " p = partitions[PARTITIONER(k2)]\n", " p[k2] = p.get(k2, []) + [v2]\n", " return [(partition_id, sorted(partition.items(), key=lambda x: x[0])) for (partition_id, partition) in enumerate(partitions)]\n", " \n", "def PARTITIONER(obj):\n", " global reducers\n", " return hash(obj) % reducers\n", " \n", "def MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, PARTITIONER=PARTITIONER, COMBINER=None):\n", " map_partitions = map(lambda record_reader: flatten(map(lambda k1v1: MAP(*k1v1), record_reader)), INPUTFORMAT())\n", " if COMBINER != None:\n", " map_partitions = map(lambda map_partition: flatten(map(lambda k2v2: COMBINER(*k2v2), groupbykey(map_partition))), map_partitions)\n", " reduce_partitions = groupbykey_distributed(map_partitions, PARTITIONER) # shuffle\n", " reduce_outputs = map(lambda reduce_partition: (reduce_partition[0], flatten(map(lambda reduce_input_group: REDUCE(*reduce_input_group), reduce_partition[1]))), reduce_partitions)\n", " \n", " print(\"{} key-value pairs were sent over a network.\".format(sum([len(vs) for (k,vs) in flatten([partition for (partition_id, partition) in reduce_partitions])])))\n", " return reduce_outputs" ], "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "metadata": { "id": "kxirlf3XqZxY" }, "source": [ "## Спецификация MapReduce Distributed\n", "\n", "\n", "```\n", "f (k1, v1) -> (k2,v2)*\n", "g (k2, v2*) -> (k3,v3)*\n", " \n", "e1 (k1, v1)\n", "e2 (k2, v2)\n", "partition1 (k2, v2)*\n", "partition2 (k2, v2*)*\n", " \n", "flatmap (e1->e2*, e1*) -> partition1*\n", "groupby (partition1*) -> partition2*\n", "\n", "mapreduce ((k1,v1)*) -> (k3,v3)*\n", "mapreduce .flatmap(f).groupby(k2).flatmap(g)\n", "```\n", "\n" ] }, { "cell_type": "markdown", "metadata": { "id": "CWYw_CpbbY3C" }, "source": [ "## WordCount " ] }, { "cell_type": "code", "metadata": { "id": "uR_zfGFkMZlp", "colab": { "base_uri": "https://localhost:8080/", "height": 68 }, "outputId": "c8d46167-473d-43b9-881a-2396991b3731" }, "source": [ "from typing import Iterator\n", "import numpy as np\n", "\n", "d1 = \"\"\"\n", "it is what it is\n", "it is what it is\n", "it is what it is\"\"\"\n", "d2 = \"\"\"\n", "what is it\n", "what is it\"\"\"\n", "d3 = \"\"\"\n", "it is a banana\"\"\"\n", "documents = [d1, d2, d3, d1, d2, d3]\n", "\n", "maps = 3\n", "reducers = 2\n", "\n", "def INPUTFORMAT():\n", " global maps\n", " \n", " def RECORDREADER(split):\n", " for (docid, document) in enumerate(split):\n", " for (lineid, line) in enumerate(document.split('\\n')):\n", " yield (\"{}:{}\".format(docid,lineid), line)\n", " \n", " split_size = int(np.ceil(len(documents)/maps))\n", " for i in range(0, len(documents), split_size):\n", " yield RECORDREADER(documents[i:i+split_size])\n", "\n", "def MAP(docId:str, line:str):\n", " for word in line.split(\" \"): \n", " yield (word, 1)\n", " \n", "def REDUCE(word:str, counts:Iterator[int]):\n", " sum = 0\n", " for c in counts:\n", " sum += c\n", " yield (word, sum)\n", " \n", "# try to set COMBINER=REDUCER and look at the number of values sent over the network \n", "partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None) \n", "partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]\n", "partitioned_output" ], "execution_count": null, "outputs": [ { "output_type": "stream", "text": [ "56 key-value pairs were sent over a network.\n" ], "name": "stdout" }, { "output_type": "execute_result", "data": { "text/plain": [ "[(0, [('', 6), ('a', 2), ('banana', 2), ('is', 18), ('it', 18), ('what', 10)]),\n", " (1, [])]" ] }, "metadata": { "tags": [] }, "execution_count": 33 } ] }, { "cell_type": "markdown", "metadata": { "id": "gCJGx8IQ87xS" }, "source": [ "## TeraSort" ] }, { "cell_type": "code", "metadata": { "id": "P2v8v1v_8_YR", "colab": { "base_uri": "https://localhost:8080/", "height": 578 }, "outputId": "e0987c25-9757-46cb-8e55-d5d2adfbee2b" }, "source": [ "import numpy as np\n", "\n", "input_values = np.random.rand(30)\n", "maps = 3\n", "reducers = 2\n", "min_value = 0.0\n", "max_value = 1.0\n", "\n", "def INPUTFORMAT():\n", " global maps\n", " \n", " def RECORDREADER(split):\n", " for value in split:\n", " yield (value, None)\n", " \n", " split_size = int(np.ceil(len(input_values)/maps))\n", " for i in range(0, len(input_values), split_size):\n", " yield RECORDREADER(input_values[i:i+split_size])\n", " \n", "def MAP(value:int, _):\n", " yield (value, None)\n", " \n", "def PARTITIONER(key):\n", " global reducers\n", " global max_value\n", " global min_value\n", " bucket_size = (max_value-min_value)/reducers\n", " bucket_id = 0\n", " while((key>(bucket_id+1)*bucket_size) and ((bucket_id+1)*bucket_size