• 沒有找到結果。

Big Data and Internet Thinking

N/A
N/A
Protected

Academic year: 2022

Share "Big Data and Internet Thinking"

Copied!
69
0
0

加載中.... (立即查看全文)

全文

(1)

Big Data and Internet Thinking

Chentao Wu

Associate Professor

Dept. of Computer Science and Engineering [email protected]

(2)

Download lectures

• ftp://public.sjtu.edu.cn

• User: wuct

• Password: wuct123456

• http://www.cs.sjtu.edu.cn/~wuct/bdit/

(3)

Schedule

• lec1: Introduction on big data, cloud computing & IoT

• Iec2: Parallel processing framework (e.g., MapReduce)

• lec3: Advanced parallel processing techniques (e.g., YARN, Spark)

• lec4: Cloud & Fog/Edge Computing

• lec5: Data reliability & data consistency

• lec6: Distributed file system & objected-based storage

• lec7: Metadata management & NoSQL Database

• lec8: Big Data Analytics

(4)

Collaborators

(5)

Contents

Introduction to Map-Reduce 2.0

1

(6)

Classic Map-Reduce Task (MRv1)

MapReduce 1 (“classic”) has three main components

API→for user-level programming of MR applications

Framework→runtime services for running Map and Reduce processes, shuffling and sorting, etc.

Resource management→infrastructure to monitor nodes, allocate resources, and schedule jobs

(7)

MRv1: Batch Focus

HADOOP 1.0

Built for Web-Scale Batch Apps

Single App

BATCH

HDFS Single App

INTERACTIVE

Single App

BATCH

HDFS

All other usage patterns MUST leverage same infrastructure

Forces Creation of Silos to Manage Mixed Workloads

Single App

BATCH

HDFS Single App

ONLINE

(8)

YARN (MRv2)

MapReduce 2 move resource management to YARN

MapReduce originally architecture at Yahoo in 2008

“alpha” in Hadoop 2 (pre-GA)

YARN promoted to sub-project in Hadoop in 2013 (Best Paper in SOCC 2013)

(9)

Why YARN is needed? (1)

MapReduce 1 resource management issues

Inflexible “slots” configured on nodes → map or reduce, not both

Underutilization of cluster when more map or reduce tasks are running

Cannot share resources with non-MR applications

running on Hadoop cluster (e.g., impala, apache giraph)

Scalability → one Job Tracker per cluster – limit of about 4000 nodes per cluster

(10)

Busy JobTracker on a large Apache

Hadoop cluster (MRv1)

(11)

Why YARN is needed? (2)

YARN Solutions

No slots

Nodes have “resources” → memory and CPU cores – which are allocated to applications when requested

Supports MR and non-MR applications running on the same cluster

Most Job Tracker functions moved to Application Master → one cluster can have many Application Masters

(12)

YARN: Taking Hadoop Beyond Batch

Applications Run Natively in Hadoop

HDFS2 (Redundant, Reliable Storage)

YARN

(Cluster Resource Management)

BATCH (MapReduce)

INTERACTIVE (Tez)

STREAMING (Storm, S4,…)

GRAPH (Giraph)

IN-MEMORY (Spark)

HPC MPI (OpenMPI) ONLINE

(HBase)

OTHER (Search) (Weave…)

Store ALL DATA in one place…

Interact with that data in MULTIPLE WAYS

with Predictable Performance and Quality of Service

(13)

YARN: Efficiency with Shared Services

Yahoo! leverages YARN

40,000+ nodes running YARN across over 365PB of data

~400,000 jobs per day for about 10 million hours of compute time Estimated a 60% – 150% improvement on node usage per day using YARN

Eliminated Colo (~10K nodes) due to increased utilization

For more details check out the YARN SOCC 2013 paper

(14)

YARN and MapReduce

YARN does not know or care what kind of application is running

Could be MR or something else (e.g., Impala)

MR2 uses YARN

Hadoop includes a MapReduce ApplicationMaster (AM) to manage MR jobs

Each MapReduce job is a new instance of an application

(15)

Running a MapReduce Application in MRv2 (1)

(16)

Running a MapReduce Application in MRv2 (2)

(17)

Running a MapReduce Application in MRv2 (3)

(18)

Running a MapReduce Application in MRv2 (4)

(19)

Running a MapReduce Application in MRv2 (5)

(20)

Running a MapReduce Application in MRv2 (6)

(21)

Running a MapReduce Application in MRv2 (7)

(22)

Running a MapReduce Application in MRv2 (8)

(23)

Running a MapReduce Application in MRv2 (9)

(24)

Running a MapReduce Application in MRv2 (10)

(25)

The MapReduce Framework on YARN

In YARN, Shuffle is run as an auxiliary service

Runs in the NodeManager JVM as a persistent service

(26)

Contents

Introduction to Spark

2

(27)

What is Spark?

Fast, expressive cluster computing system compatible with Apache Hadoop

Works with any Hadoop-supported storage system (HDFS, S3, Avro, …)

Improves efficiency through:

In-memory computing primitives

General computation graphs

Improves usability through:

Rich APIs in Java, Scala, Python

Interactive shell

Up to 100× faster

Often 2-10× less code

(28)

How to Run It & Languages

Local multicore: just a library in your program

EC2: scripts for launching a Spark cluster

Private cluster: Mesos, YARN, Standalone Mode

APIs in Java, Scala and Python

Interactive shells in Scala and Python

(29)

Spark Framework

Spark + Hive Spark + Pregel

(30)

Key Idea

Work with distributed collections as you would with local ones

Concept: resilient distributed datasets (RDDs)

Immutable collections of objects spread across a cluster

Built through parallel transformations (map, filter, etc)

Automatically rebuilt on failure

Controllable persistence (e.g. caching in RAM)

(31)

Spark Runtime

Spark runs as a library in your program

(1 instance per app)

Runs tasks locally or on Mesos

new SparkContext ( masterUrl, jobname, [sparkhome], [jars] )

MASTER=local[n] ./spark-shell

MASTER=HOST:PORT ./spark-shell

(32)

Example: Mining Console Logs

Load error messages from a log into memory, then interactively search for patterns

lines = spark.textFile(“hdfs://...”)

errors = lines.filter(lambda s: s.startswith(“ERROR”)) messages = errors.map(lambda s: s.split(‘\t’)[2])

messages.cache()

Block 1

Block 2 Block 3

Worker

Worker

Worker Driver

messages.filter(lambda s: “foo” in s).count() messages.filter(lambda s: “bar” in s).count() . . .

tasks results

Cache 1

Cache 2 Cache 3

Base RDD

Transformed RDD

Action

Result: full-text search of Wikipedia in <1 sec (vs 20 sec for on-disk data)

Result: scaled to 1 TB data in 5-7 sec (vs 170 sec for on-disk data)

(33)

RDD Fault Tolerance

RDDs track the transformations used to build them (their lineage) to recompute lost data

E.g: messages = textFile(...).filter(lambda s: s.contains(“ERROR”)) .map(lambda s: s.split(‘\t’)[2])

HadoopRDD

path = hdfs://…

FilteredRDD

func = contains(...)

MappedRDD

func = split(…)

(34)

Which Language Should I Use?

Standalone programs can be written in any, but console is only Python & Scala

Python developers: can stay with Python for both

Java developers: consider using Scala for console (to learn the API)

Performance: Java / Scala will be faster (statically typed), but Python can do well for numerical work with NumPy

(35)

Iterative Processing in Hadoop

(36)

Throughput Mem vs. Disk

• Typical throughput of disk: ~ 100 MB/sec

• Typical throughput of main memory: 50 GB/sec

• => Main memory is ~ 500 times faster than disk

(37)

Spark→ In Memory Data Sharing

(38)

Spark vs. Hadoop MapReduce (3)

(39)

Spark vs. Hadoop MapReduce (4)

(40)

On-Disk Sort Record Time to sort 100TB

2100 machines

2013 Record:

Hadoop

2014 Record:

Spark

Source: Daytona GraySort benchmark, sortbenchmark.org

72 minutes 207 machines 23 minutes

Also sorted 1PB in 4 hours

(41)

Powerful Stack – Agile Development (1)

0 20000 40000 60000 80000 100000 120000 140000

Hadoop MapReduce

Storm (Streaming)

Impala (SQL)

Giraph (Graph)

Spark

non-test, non-example source lines

(42)

Powerful Stack – Agile Development (2)

non-test, non-example source lines 0

20000 40000 60000 80000 100000 120000 140000

Hadoop MapReduce

Storm (Streaming)

Impala (SQL)

Giraph (Graph)

Spark

Streaming

(43)

Powerful Stack – Agile Development (3)

non-test, non-example source lines 0

20000 40000 60000 80000 100000 120000 140000

Hadoop MapReduce

Storm (Streaming)

Impala (SQL)

Giraph (Graph)

Spark

SparkSQL Streaming

(44)

Powerful Stack – Agile Development (4)

non-test, non-example source lines 0

20000 40000 60000 80000 100000 120000 140000

Hadoop MapReduce

Storm (Streaming)

Impala (SQL)

Giraph (Graph)

Spark

SparkSQL Streaming

(45)

Powerful Stack – Agile Development (5)

non-test, non-example source lines 0

20000 40000 60000 80000 100000 120000 140000

Hadoop MapReduce

Storm (Streaming)

Impala (SQL)

Giraph (Graph)

Spark

GraphX StreamingSparkSQL

(46)

Powerful Stack – Agile Development (6)

non-test, non-example source lines 0

20000 40000 60000 80000 100000 120000 140000

Hadoop MapReduce

Storm (Streaming)

Impala (SQL)

Giraph (Graph)

Spark

GraphX StreamingSparkSQL Your fancy

SIGMOD technique here

(47)

Contents

Spark Programming

3

(48)

Learning Spark

Easiest way: Spark interpreter (spark-shell or pyspark)

Special Scala and Python consoles for cluster use

Runs in local mode on 1 thread by default, but can control with MASTER environment var:

MASTER=local ./spark-shell # local, 1 thread MASTER=local[2] ./spark-shell # local, 2 threads

MASTER=spark://host:port ./spark-shell # Spark standalone cluster

(49)

First Step: SparkContext

Main entry point to Spark functionality

Created for you in Spark shells as variable sc

In standalone programs, you’d make your own (see later for details)

(50)

Creating RDDs

# Turn a local collection into an RDD sc.parallelize([1, 2, 3])

# Load text file from local FS, HDFS, or S3 sc.textFile(“file.txt”)

sc.textFile(“directory/*.txt”)

sc.textFile(“hdfs://namenode:9000/path/file”)

# Use any existing Hadoop InputFormat

sc.hadoopFile(keyClass, valClass, inputFmt, conf)

(51)

Basic Transformations

nums = sc.parallelize([1, 2, 3])

# Pass each element through a function

squares = nums.map(lambda x: x*x) # => {1, 4, 9}

# Keep elements passing a predicate

even = squares.filter(lambda x: x % 2 == 0) # =>

{4}

# Map each element to zero or more others

nums.flatMap(lambda x: range(0, x)) # => {0, 0, 1, 0, 1, 2}

Range object (sequence of

numbers 0, 1, …, x-1)

(52)

Basic Actions

nums = sc.parallelize([1, 2, 3])

# Retrieve RDD contents as a local collection nums.collect() # => [1, 2, 3]

# Return first K elements nums.take(2) # => [1, 2]

# Count number of elements nums.count() # => 3

# Merge elements with an associative function nums.reduce(lambda x, y: x + y) # => 6

# Write elements to a text file

nums.saveAsTextFile(“hdfs://file.txt”)

(53)

Working with Key-Value Pairs

• Spark’s “distributed reduce” transformations act on RDDs of key-value pairs

• Python:

pair = (a, b)

pair[0] # => a pair[1] # => b

• Scala:

val pair = (a, b)

pair._1 // => a pair._2 // => b

• Java:

Tuple2 pair = new Tuple2(a, b); // class scala.Tuple2

pair._1 // => a pair._2 // => b

(54)

Some Key-Value Operations

pets = sc.parallelize([(“cat”, 1), (“dog”, 1), (“cat”, 2)])

pets.reduceByKey(lambda x, y: x + y)

# => {(cat, 3), (dog, 1)}

pets.groupByKey()

# => {(cat, Seq(1, 2)), (dog, Seq(1)}

pets.sortByKey()

# => {(cat, 1), (cat, 2), (dog, 1)}

reduceByKey also automatically implements combiners on the map side

(55)

Example: Word Count

lines = sc.textFile(“hamlet.txt”)

counts = lines.flatMap(lambda line:

line.split(“ ”)) \

.map(lambda word: (word, 1)) \ .reduceByKey(lambda x, y: x + y)

“to be or”

“not to be”

“to”

“be”

“or”

“not”

“to”

“be”

(to, 1) (be, 1) (or, 1)

(not, 1) (to, 1) (be, 1)

(be, 2) (not, 1)

(or, 1) (to, 2)

(56)

Multiple Datasets

visits = sc.parallelize([(“index.html”, “1.2.3.4”), (“about.html”, “3.4.5.6”), (“index.html”, “1.3.3.1”)]) pageNames = sc.parallelize([(“index.html”, “Home”), (“about.html”, “About”)])

visits.join(pageNames)

# (“index.html”, (“1.2.3.4”, “Home”))

# (“index.html”, (“1.3.3.1”, “Home”))

# (“about.html”, (“3.4.5.6”, “About”)) visits.cogroup(pageNames)

# (“index.html”, (Seq(“1.2.3.4”, “1.3.3.1”), Seq(“Home”)))

# (“about.html”, (Seq(“3.4.5.6”), Seq(“About”)))

(57)

Controlling the level of parallelism

• All the pair RDD operations take an optional second parameter for number of tasks

words.reduceByKey(lambda x, y: x + y, 5) words.groupByKey(5)

visits.join(pageViews, 5)

(58)

Using Local Variables

• External variables you use in a closure will automatically be shipped to the cluster:

query = raw_input(“Enter a query:”)

pages.filter(lambda x: x.startswith(query)).count()

• Some caveats:

• Each task gets a new copy (updates aren’t sent back)

• Variable must be Serializable (Java/Scala) or Pickle-able (Python)

• Don’t use fields of an outer object (ships all of it!)

(59)

Closure Mishap Example

class MyCoolRddApp { val param = 3.14

val log = new Log(...) ...

def work(rdd: RDD[Int]) {

rdd.map(x => x + param)

.reduce(...) }

}

How to get around it:

class MyCoolRddApp { ...

def work(rdd: RDD[Int]) {

val param_ = param rdd.map(x => x + param_)

.reduce(...) }

NotSerializableException: } MyCoolRddApp (or Log)

References only local variable instead of this.param

(60)

Build Spark

• Requires Java 6+, Scala 2.9.2

git clone git://github.com/mesos/spark cd spark

sbt/sbt package

# Optional: publish to local Maven cache

sbt/sbt publish-local

(61)

Add Spark into Your Project

• Scala and Java: add a Maven dependency on groupId: org.spark-project

artifactId: spark-core_2.9.1 version: 0.7.0-SNAPSHOT

• Python: run program with our pyspark script

(62)

Create a SparkContext

import spark.api.java.JavaSparkContext;

JavaSparkContext sc = new JavaSparkContext(

“masterUrl”, “name”, “sparkHome”, new String[]

{“app.jar”}));

import spark.SparkContext import spark.SparkContext._

val sc = new SparkContext(“masterUrl”, “name”, “sparkHome”, Seq(“app.jar”))

Cluster URL, or local / local[N]

App name

Spark install path on cluster List of JARs with

app code (to ship)

Sc ala Ja va

from pyspark import SparkContext

sc = SparkContext(“masterUrl”, “name”, “sparkHome”, [“library.py”]))

P ython

(63)

Complete App: Scala

import spark.SparkContext import spark.SparkContext._

object WordCount {

def main(args: Array[String]) {

val sc = new SparkContext(“local”,

“WordCount”, args(0), Seq(args(1))) val lines = sc.textFile(args(2)) lines.flatMap(_.split(“ ”))

.map(word => (word, 1)) .reduceByKey(_ + _)

.saveAsTextFile(args(3)) }

}

(64)

Complete App: Python

import sys

from pyspark import SparkContext if __name__ == "__main__":

sc = SparkContext( “local”, “WordCount”, sys.argv[0], None)

lines = sc.textFile(sys.argv[1])

lines.flatMap(lambda s: s.split(“ ”)) \ .map(lambda word: (word, 1)) \

.reduceByKey(lambda x, y: x + y) \ .saveAsTextFile(sys.argv[2])

(65)

Contents

Graph Computing

4

(66)

Graphs are very where

Program Flow Ecological

Network Biological

Network Social Network

Chemical

Network Web Graph

(67)

Complex Graphs

• Real-life graph contains complex contents – labels associated with nodes, edges and graphs.

Node Labels:

Location, Gender, Charts, Library, Events, Groups, Journal, Tags, Age, Tracks.

(68)

Large Graphs

# of Users # of Links Facebook 400 Million 52K Million

Twitter 105 Million 10K Million LinkedIn 60 Million 0.9K Million

Last.FM 40 Million 2K Million LiveJournal 25 Million 2K Million

del.icio.us 5.3 Million 0.7K Million

DBLP 0.7 Million 8 Million

(69)

Thank you!

參考文獻

相關文件

• The memory storage unit holds instructions and data for a running program.. • A bus is a group of wires that transfer data from one part to another (data,

A quote from Dan Ariely, “Big data is like teenage sex: everyone talks about it, nobody really knows how to do it, everyone thinks everyone else is doing it, so everyone claims they

Per-capita shopping spending of overnight visitors decreased slightly by 0.1% year-on-year to MOP 1,068 (38.2% of overall spending); per-capita spending on accommodation (MOP 927)

In the fourth quarter of 2012, per-capita spending and total spending of visitors are extrapolated from about 41,000 effective questionnaires of the Visitor Expenditure

The research proposes a data oriented approach for choosing the type of clustering algorithms and a new cluster validity index for choosing their input parameters.. The

Joint “ “AMiBA AMiBA + Subaru + Subaru ” ” data, probing the gas/DM distribution data, probing the gas/DM distribution out to ~80% of the cluster. out to ~80% of the cluster

• The memory storage unit holds instructions and data for a running program.. • A bus is a group of wires that transfer data from one part to another (data,

“Big data is high-volume, high-velocity and high-variety information assets that demand cost-effective, innovative forms of information processing for enhanced?. insight and