• 沒有找到結果。

Efficient Processing of k Nearest Neighbor Joins using MapReduce

N/A
N/A
Protected

Academic year: 2022

Share "Efficient Processing of k Nearest Neighbor Joins using MapReduce"

Copied!
12
0
0

加載中.... (立即查看全文)

全文

(1)

Efficient Processing of k Nearest Neighbor Joins using MapReduce

Wei Lu Yanyan Shen Su Chen Beng Chin Ooi National University of Singapore

{luwei1,shenyanyan,chensu,ooibc}@comp.nus.edu.sg

ABSTRACT

k nearest neighbor join (kNN join), designed to find k nearest neighbors from a dataset S for every object in another dataset R, is a primitive operation widely adopted by many data mining ap- plications. As a combination of the k nearest neighbor query and the join operation, kNN join is an expensive operation. Given the increasing volume of data, it is difficult to perform a kNN join on a centralized machine efficiently. In this paper, we investigate how to perform kNN join using MapReduce which is a well-accepted framework for data-intensive applications over clusters of comput- ers. In brief, the mappers cluster objects into groups; the reducers perform the kNN join on each group of objects separately. We design an effective mapping mechanism that exploits pruning rules for distance filtering, and hence reduces both the shuffling and com- putational costs. To reduce the shuffling cost, we propose two ap- proximate algorithms to minimize the number of replicas. Exten- sive experiments on our in-house cluster demonstrate that our pro- posed methods are efficient, robust and scalable.

1. INTRODUCTION

k nearest neighbor join (kNN join) is a special type of join that combines each object in a dataset R with the k objects in another dataset S that are closest to it. kNN join typically serves as a primi- tive operation and is widely used in many data mining and analytic applications, such as the k-means and k-medoids clustering and outlier detection [5, 12].

As a combination of the k nearest neighbor (kNN) query and the join operation, kNN join is an expensive operation. The naive im- plementation of kNN join requires scanning S once for each object in R (computing the distance between each pair of objects from R and S), easily leading to a complexity of O(|R| · |S|). Therefore, considerable research efforts have been made to improve the effi- ciency of the kNN join [4, 17, 19, 18]. Most of the existing work devotes themselves to the design of elegant indexing techniques for avoiding scanning the whole dataset repeatedly and for pruning as many distance computations as possible.

All the existing work [4, 17, 19, 18] is proposed based on the centralized paradigm where the kNN join is performed on a sin-

gle, centralized server. However, given the limited computational capability and storage of a single machine, the system will eventu- ally suffer from performance deterioration as the size of the dataset increases, especially for multi-dimensional datasets. The cost of computing the distance between objects increases with the num- ber of dimensions; and the curse of the dimensionality leads to a decline in the pruning power of the indexes.

Regarding the limitation of a single machine, a natural solution is to consider parallelism in a distributed computational environ- ment. MapReduce [6] is a programming framework for processing large scale datasets by exploiting the parallelism among a cluster of computing nodes. Soon after its birth, MapReduce gains pop- ularity for its simplicity, flexibility, fault tolerance and scalabili- ty. MapReduce is now well studied [10] and widely used in both commercial and scientific applications. Therefore, MapReduce be- comes an ideal framework of processing kNN join operations over massive, multi-dimensional datasets.

However, existing techniques of kNN join cannot be applied or extended to be incorporated into MapReduce easily. Most of the existing work rely on some centralized indexing structure such as the B+-tree [19] and the R-tree [4], which cannot be accommodat- ed in such a distributed and parallel environment directly.

In this paper, we investigate the problem of implementing kNN join operator in MapReduce. The basic idea is similar to the hash join algorithm. Specifically, the mapper assigns a key to each ob- ject from R and S; the objects with the same key are distributed to the same reducer in the shuffling process; the reducer performs the kNN join over the objects that have been shuffled to it. To guar- antee the correctness of the join result, one basic requirement of data partitioning is that for each object r in R, the k nearest neigh- bors of r in S should be sent to the same reducer as r does, i.e., the k nearest neighbors should be assigned with the same key as r.

As a result, objects in S may be replicated and distributed to mul- tiple reducers. The existence of replicas leads to a high shuffling cost and also increases the computational cost of the join operation within a reducer. Hence, a good mapping function that minimizes the number of replicas is one of the most critical factors that affect the performance of the kNN join in MapReduce.

In particular, we summarize the contributions of the paper as fol- lows.

• We present an implementation of kNN join operator using MapReduce, especially for large volume of multi-dimensional data. The implementation defines the mapper and reducer jobs and requires no modifications to the MapReduce frame- work.

• We design an efficient mapping method that divides object- s into groups, each of which is processed by a reducer to Permission to make digital or hard copies of all or part of this work for

personal or classroom use is granted without fee provided that copies are not made or distributed for profit or commercial advantage and that copies bear this notice and the full citation on the first page. To copy otherwise, to republish, to post on servers or to redistribute to lists, requires prior specific permission and/or a fee. Articles from this volume were invited to present their results at The 38th International Conference on Very Large Data Bases, August 27th - 31st 2012, Istanbul, Turkey.

Proceedings of the VLDB Endowment,Vol. 5, No. 10

Copyright 2012 VLDB Endowment 2150-8097/12/06...$10.00.

(2)

perform the kNN join. First, the objects are divided into par- titions based on a Voronoi diagram with carefully selected pivots. Then, data partitions (i.e., Voronoi cells) are clustered into groups only if the distances between them are restricted by a specific bound. We derive a distance bound that leads to groups of objects that are more closely involved in the kNN join.

• We derive a cost model for computing the number of replicas generated in the shuffling process. Based on the cost mod- el, we propose two grouping strategies that can reduce the number of replicas greedily.

• We conduct extensive experiments to study the effect of var- ious parameters using two real datasets and some synthetic datasets. The results show that our proposed methods are efficient, robust, and scalable.

The remainder of the paper is organized as follows. Section 2 de- scribes some background knowledge. Section 3 gives an overview of processing kNN join in MapReduce framework, followed by the details in Section 4. Section 5 presents the cost model and grouping strategies for reducing the shuffling cost. Section 6 reports the ex- perimental results. Section 7 discusses related work and Section 8 concludes the paper.

2. PRELIMINARIES

In this section, we first define kNN join formally and then give a brief review of the MapReduce framework. Table 1 lists the sym- bols and their meanings used throughout this paper.

2.1

k

NN Join

We consider data objects in an n-dimensional metric space D.

Given two data objects r and s, |r, s| represents the distance be- tween r and s in D. For the ease of exposition, the Euclidean dis- tance (L2) is used as the distance measure in this paper, i.e.,

|r, s| = 

1≤i≤n

(r[i] − s[i])2, (1)

where r[i] (resp. s[i]) denotes the value of r (resp. s) along the ithdimension inD. Without loss of generality, our methods can be easily applied to other distance measures such as the Manhattan distance (L1), and the maximum distance (L).

DEFINITION 1. (k nearest neighbors) Given an object r, a dataset S and an integer k, the k nearest neighbors of r from S, denoted as KNN(r, S), is a set of k objects from S that ∀o ∈ KNN(r, S), ∀s ∈ S − KNN(r, S), |o, r| ≤ |s, r|.

DEFINITION 2. (kNN join) Given two datasets R and S and an integer k, kNN join of R and S (denoted as R KNNS, abbre- viated as R  S), combines each object r ∈ R with its k nearest neighbors from S. Formally,

R  S = {(r, s)|∀r ∈ R, ∀s ∈ KNN(r, S)} (2) According to Definition 2, R  S is a subset of R × S. Note that kNN join operation is asymmetric, i.e., R  S = S  R. Given k ≤ |S|, the cardinality of |R  S| is k × |R|. In the rest of this paper, we assume that k ≤ |S|. Otherwise, kNN join degrades to the cross join and just generates the result of Cartesian product R × S.

Table 1: Symbols and their meanings Symbol Definition

D an n-dimensional metric space R (resp. S) an object set R (resp. S) in D r (resp. s) an object, r ∈ R (resp. s ∈ S)

|r, s| the distance from r to s k the number of near neighbors KNN(r, S) the k nearest neighbors of r from S R  S kNN join of R and S

P a set of pivots

pi a pivot inP

pr the pivot inP that is closest to r PiR the partition of R that corresponds to pi

pi.dj the jthsmallest distance of objects in PiSto pi

U(PiR) max{|r, p||∀r ∈ PiR} L(PiR) min{|r, p||∀r ∈ PiR}

TR the summary table for partitions in R

N the number of reducers

2.2 MapReduce Framework

MapReduce [6] is a popular programming framework to sup- port data-intensive applications using shared-nothing clusters. In MapReduce, input data are represented as key-value pairs. Sever- al functional programming primitives including Map and Reduce are introduced to process the data. Map function takes an input key-value pair and produces a set of intermediate key-value pairs.

MapReduce runtime system then groups and sorts all the interme- diate values associated with the same intermediate key, and sends them to the Reduce function. Reduce function accepts an interme- diate key and its corresponding values, applies the processing logic, and produces the final result which is typically a list of values.

Hadoop is an open source software that implements the MapRe- duce framework. Data in Hadoop are stored in HDFS by default.

HDFS consists of multiple DataNodes for storing data and a master node called NameNode for monitoring DataNodes and maintain- ing all the meta-data. In HDFS, imported data will be split into equal-size chunks, and the NameNode allocates the data chunks to different DataNodes. The MapReduce runtime system establishes two processes, namely JobTracker and TaskTracker. The JobTrack- er splits a submitted job into map and reduce tasks and schedules the tasks among all the available TaskTrackers. TaskTrackers will accept and process the assigned map/reduce tasks. For a map task, the TaskTracker takes a data chunk specified by the JobTracker and applies the map() function. When all the map() functions com- plete, the runtime system groups all the intermediate results and launches a number of reduce tasks to run the reduce() function and produce the final results. Both map() and reduce() func- tions are specified by the user.

2.3 Voronoi Diagram-based Partitioning

Given a dataset O, the main idea of Voronoi diagram-based par- titioning is to select M objects (which may not belong to O) as pivots, and then split objects of O into M disjoint partitions where each object is assigned to the partition with its closest pivot1. In this way, the whole data space is split into M “generalized Voronoi cells”. Figure 1 shows an example of splitting objects into 5 par- titions by employing the Voronoi diagram-based partitioning. For

1In particular, if there exist multiple pivots that are closest to an object, then the object is assigned to the partition with the smallest number of objects.

(3)

3

3

3

3

3

Figure 1: An example of data partitioning

the sake of brevity, letP be the set of pivots selected. ∀pi∈ P, PiO

denotes the set of objects from O that takes pias their closest pivot.

For an object o, let po and PoO be its closest pivot and the corre- sponding partition respectively. In addition, we use U(PiO) and L(PiO) to denote the maximum and minimum distance from pivot pito the objects of PiO, i.e., U(PiO) = max{|o, pi||∀o ∈ PiO}, L(PiO) = min{|o, pi||∀o ∈ PiO}.

DEFINITION 3. (Range Selection) Given a dataset O, an ob- ject q, and a distance threshold θ, range selection of q from O is to find all objects (denoted as ¯O) of O, such that ∀o ∈ ¯O, |q, o| ≤ θ.

By splitting the dataset into a set of partitions, we can answer range selection queries based on the following theorem.

THEOREM 1. [8] Given two pivots pi, pj, let HP (pi, pj) be the generalized hyperplane, where any object o lying on HP (pi, pj) has the equal distance to piand pj.∀o ∈ PjO, the distance of o to HP (pi, pj), denoted as d(o, HP (pi, pj)) is:

d(o, HP (pi, pj)) = |o, pi|2− |o, pj|2

2 × |pi, pj| (3)

Figure 2(a) shows distance d(o, HP (pi, pj)). Given object q, its belonging partition PqO, and another partition PiO, according to Theorem 1, it is able to compute the distance from q to HP (pq, pi).

Hence, we can derive the following corollary.

COROLLARY 1. Given a partition PiOand PiO = PqO, if we can derive d(q, HP (pq, pi)) > θ, then ∀o ∈ PiO,|q, o| > θ.

Given a partition PiO, if we get d(q, HP (pq, pi)) > θ, accord- ing to Corollary 1, we can discard all objects of PiO. Otherwise, we check partial objects of PiObased on Theorem 2.

THEOREM 2. [9, 20] Given a partition PiO, ∀o ∈ PiO, the necessary condition that|q, o| ≤ θ is:

max{L(PiO), |pi, q| − θ} ≤ |pi, o| ≤ min{U(PiO), |pi, q| + θ}

(4) Figure 2(b) shows an example of the bounding area of Equation 4. To answer range selections, we only need to check objects that lie in the bounding area of each partition.

3. AN OVERVIEW OF KNN JOIN USING MAPREDUCE

In MapReduce, the mappers produce key-value pairs based on the input data; each reducer performs a specific task on a group

_

 _



_

 _ _



_  

M L

M L

S S

S R S R

×

2

SL 2 SM

 SL SM +3

(a) d(o, HP (pi, pj))

T

` _

 _ 

PD[^/3L2 SLT θ

` _

 _ 

PLQ^8 3L2 SL T +θ

3L2 /

3L2 8 SL

(b) bounding area of Equation 4 Figure 2: Properties of data partitioning

of pairs with the same key. In essence, the mappers do something similar to (typically more than) the hashing function. A naive and straightforward idea of performing kNN join in MapReduce is sim- ilar to the hash join algorithm.

Specifically, the map() function assigns each object r ∈ R a key; based on the key, R is split into disjoint subsets, i.e., R =



1≤i≤NRi, where Ri

Rj = ∅, i = j; each subset Riis dis- tributed to a reducer. Without any pruning rule, the entire set S has to be sent to each reducer to be joined with Ri; finally R  S =



1≤i≤NRi S.

In this scenario, there are two major considerations that affect the performance of the entire join process.

1. The shuffling cost of sending intermediate results from map- pers to reducers.

2. The cost of performing the kNN join on the reducers.

Obviously, the basic strategy is too expensive. Each reducer per- forms kNN join between a subset of R and the entire S. Given a large population of S, it may go beyond the capability of the re- ducer. An alternative framework [21], called H-BRJ, splits both R and S into√

N disjoint subsets, i.e., R =

1≤i≤

NRi, S =



1≤j≤

NSj. Similarly, the partitioning of R and S in H-BRJ is performed by the map() function; a reducer performs the kNN join between a pair of subsets Riand Sj; finally, the join results of all pairs of subsets are merged and R  S =

1≤i,j≤

NRi Sj. In H-BRJ, R and S are partitioned into equal-sized subsets on a random basis.

While the basic strategy can produce the join result using one MapReduce job, H-BRJ requires two MapReduce jobs. Since the set S is partitioned into several subsets, the join result of the first reducer is incomplete, and another MapReduce is required to com- bine the results of Ri Sjfor all1 ≤ j ≤√

N. Therefore, the shuffling cost of H-BRJ is√

N · (|R| + |S|) +

i



j|Ri Sj|2, while for the basic strategy, it is|R| + N · |S|.

In order to reduce the shuffling cost, a better strategy is that R is partitioned into N disjoint subsets and for each subset Ri, find a subset of Sithat RiS = RiSiand RS =

1≤i≤NRiSj. Then, instead of sending the entire S to each reducer (as in the basic strategy) or sending each Rito√

N reducers, Siis sent to the reducer that Ribelongs to and the kNN join is performed between Riand Sionly.

2

N · (|R| + |S|) is the shuffling cost of the first MapReduce.



i



j|Ri Sj| is the shuffling cost of the second MapReduce for merging the partial results.

(4)

WŝǀŽƚƐ

Z͕^ DĂƉ DĂƉ ZĞĚƵĐĞ

ŬĞLJ

KďũĞĐƚ ŬŶĞĂƌĞƐƚŶĞŝŐŚďŽƌƐ ǀĂůƵĞ ƌϭ <EE;ƌϭ͕^Ϳ ƌϮ <EE;ƌϮ͕^Ϳ ƌϯ <EE;ƌϯ͕^Ϳ

͘͘͘ ͘͘͘

ŬĞLJ

WĂƌƚŝƚŝŽŶ/ ĂƚĂƐĞƚ W/ KďũĞĐƚ ǀĂůƵĞ

ϭ Z ϭ ƌϭ

ϭ ^ ϭ Ɛϭ

Ϯ ^ ϭ Ɛϭ

͘͘͘ ͘͘͘ ͘͘͘ ͘͘͘

ŝƐƚ Ϭ ϰϳ ϰϳ

͘͘͘

^ƵŵŵĂƌLJdĂďůĞdZ

WĂƌƚŝƚŝŽŶ/ ηŽĨŽďũĞĐƚƐŝŶWŝZ

ϭ ϵϴϵ

Ϯ ϵϵϴ

h;WŝZ

Ϳ ϭϬϬ͘ϯ ϭϬϯ͘ϭ

>;WŝZ

Ϳ Ϭ Ϭ

͘͘͘ ͘͘͘

E ϭϬϬϱ

͘͘͘

ϭϬϵ͘ϰ

͘͘͘

Ϭ

^ƵŵŵĂƌLJdĂďůĞd^

WŝǀŽƚ Z͕^

^ĞůĞĐƚŝŽŶ

DĂƉKƵƚƉƵƚ DĂƉKƵƚƉƵƚ

DĂƉKƵƚƉƵƚ

WĂƌƚŝƚŝŽŶ^ƚĂƚŝƐƚŝĐƐ

&ŝƌƐƚDĂƉͲZĞĚƵĐĞ ^ĞĐŽŶĚDĂƉͲZĞĚƵĐĞ

WƌĞƉƌŽĐĞƐƐŝŶŐ

ŬĞLJ

>ŝŶĞ/ ĂƚĂƐĞƚ W/ KďũĞĐƚ ǀĂůƵĞ

ϭ Z ϭ ƌϭ

Ϯ ^ ϭ Ɛϭ

ϯ ^ Ϯ ƐϮ

͘͘͘ ͘͘͘ ͘͘͘ ͘͘͘

ŝƐƚ Ϭ ϰϳ ϰϴ

͘͘͘

ηŽĨŽďũĞĐƚƐŝŶWŝ^

ϭϬϬϬ ϭϭϬϬ

h;Wŝ^

Ϳ ϭϬϬ͘ϭ ϭϭϬ͘ϭ

͘͘͘

ϭϭϬϬ

͘͘͘

ϵϵ͘ϳ Ɖŝ͘Ɛϭ

Ɛϭϭ

ƐϮϭ

͘͘͘

Ɛ

͘͘͘

͘͘͘

͘͘͘

͘͘͘

͘͘͘

Ɖŝ͘ƐŬ

ƐϭŬ

ƐϮŬ

͘͘͘

Ɛ

>;Wŝ^

Ϳ Ϭ Ϭ

͘͘͘

Ϭ WĂƌƚŝƚŝŽŶ/

ϭ Ϯ

͘͘͘

E

ZĞĚƵĐĞ

KƵƚƉƵƚ

Figure 3: An overview of kNN join in MapReduce This approach avoids replication on the set R and sending the

entire set S to all reducers. However, to guarantee the correctness of the kNN join, the subset Simust contain the k nearest neighbors of every r ∈ Ri, i.e.,∀r ∈ Ri, KNN(r, S) ⊆ Si. Note that Si∩ Sjmay not be empty, as it is possible that object s is one of the k nearest neighbors of ri ∈ Ri and rj ∈ Rj. Hence, some of the objects in S should be replicated and distributed to multiple reducers. The shuffling cost is|R| + α · |S|, where α is the average number of replicas of an object in S. Apparently, if we can reduce the value of α, both shuffling and computational cost we consider can be reduced.

In summary, for the purpose of minimizing the join cost, we need to

1. find a good partitioning of R;

2. find the minimal set of Sifor each Ri ∈ R, given a parti- tioning of R3.

Intuitively, a good partitioning of R should cluster objects in R based on their proximity, so that the objects in a subset Riare more likely to share common k nearest neighbors from S. For each Ri, the objects in each corresponding Siare cohesive, leading to a s- maller size of Si. Therefore, such partitioning not only leads to a lower shuffling cost, but also reduces the computational cost of performing the kNN join between each Riand Si, i.e., the number of distance calculations.

4. HANDLING KNN JOIN USING MAPRE- DUCE

In this section, we introduce our implementation of kNN join using MapReduce. First, Figure 3 illustrates the working flow of our kNN join, which consists of one preprocessing step and two MapReduce jobs.

3The minimum set of Siis Si =

1≤j≤|Ri|KNN(ri, S). How- ever, it is impossible to find out the k nearest neighbors for all ri

apriori.

• First, the preprocessing step finds out a set of pivot objects based on the input dataset R. The pivots are used to cre- ate a Voronoi diagram, which can help partition objects in R effectively while preserving their proximity.

• The first MapReduce job consists of a single Map phase, which takes the selected pivots and datasets R and S as the input. It finds out the nearest pivot for each object in R ∪ S and computes the distance between the object and the piv- ot. The result of the mapping phase is a partitioning on R, based on the Voronoi diagram of the pivots. Meanwhile, the mappers also collect some statistics about each partition Ri.

• Given the partitioning on R, mappers of the second MapRe- duce job find the subset Siof S for each subset Ribased on the statistics collected in the first MapReduce job. Finally, each reducer performs the kNN join between a pair of Ri

and Sireceived from the mappers.

4.1 Data Preprocessing

As mentioned in previous section, a good partitioning of R for optimizing kNN join should cluster objects based on their proximi- ty. We adopt the Voronoi diagram-based data partitioning technique as reviewed in Section 2, which is well-known for maintaining data proximity, especially for data in multi-dimensional space. There- fore, before launching the MapReduce jobs, a preprocessing step is invoked in a master node for selecting a set of pivots to be used for Voronoi diagram-based partitioning. In particular, the following three strategies can be employed to select pivots.

• Random Selection. First, T random sets of objects are se- lected from R. Then, for each set, we compute the total sum of the distances between every two objects. Finally, the ob- jects from the set with the maximum total sum distance are selected as the pivots for data partitioning.

• Farthest Selection. The set of pivots are selected iteratively from a sample of the original dataset R (since preprocessing procedure is executed on a master node, the original dataset may be too large for it to process). First, we randomly select an object as the first pivot. Next, the object with the largest

(5)

distance to the first pivot is selected as the second pivot. In the ith iteration, the object that maximizes the sum of its distance to the first i − 1 pivots is chosen as the ithpivot.

• k-means Selection. Similar to the farthest selection, k-means selection first does sampling on the R. Then, traditional k- means clustering method is applied on the sample. With the k data clusters generated, the center point of each cluster is chosen as a pivot for the Voronoi diagram-based data parti- tioning.

4.2 First MapReduce Job

Given the set of pivots selected in the preprocessing step, we launch a MapReduce job for performing data partitioning and col- lecting some statistics for each partition. Figure 4 shows an exam- ple of the input/output of the mapper function of the first MapRe- duce job.

Specifically, before launching the map function, the selected piv- otsP are loaded into main memory in each mapper. A mapper se- quentially reads each object o from the input split, computes the distance between o and all pivots in P, and assigns o to the closest pivot P . Finally, as illustrated in Figure 4, the mapper outputs each object o along with its partition id, original dataset name (R or S), distance to the closest pivot.

Meanwhile, the first map function also collects some statistic for each input data split and these statistics are merged together while the MapReduce job completes. Two in-memory tables called sum- mery tables are created to keep these statistics. Figure 3 shows an example of the summary tables TRand TSfor partitions of R and S, respectively. Specifically, TRmaintains the following informa- tion for every partition of R: the partition id, the number of objects in the partition, the minimum distance L(PiR) and maximum dis- tance L(PiR) from an object in partition PiR to the pivot. Note that although the pivots are selected based on dataset R alone, the Voronoi diagram based on the pivots can be used to partition S as well. TSmaintains the same fields as those in TRfor S. Moreover, TSalso maintains the distances between objects in KNN(pi, PiS) and pi, where KNN(pi, PiS) refers to the k nearest neighbors of pivot pifrom objects in partition PiS. In Figure 3, pi.djin TSrep- resents the distance between pivot piand its jthnearest neighbor in KNN(pi, PiS). The information in TRand TSwill be used to guide how to generate Sifor Rias well as to speed up the compu- tation of Ri Siby deriving distance bounds of the kNN for any object of R in the second MapReduce job.

4.3 Second MapReduce Job

The second MapReduce job performs the kNN join in the way introduced in Section 3. The main task of the mapper in the sec- ond MapReduce is to find the corresponding subset Sifor each Ri. Each reducer performs the kNN join between a pair of Riand Si.

As mentioned previously, to guarantee the correctness, Sishould contains the kNN of all r ∈ Ri, i.e., Si=

∀rj∈RiKNN(rj, S).

However, we cannot get the exact Siwithout performing the kNN join on Riand S. Therefore, in the following, we derive a distance bound based on the partitioning of R which can help us reduce the size of Si.

4.3.1 Distance Bound of

k

NN

Instead of computing the kNN from S for each object of R, we derive a bound of the kNN distance using a set oriented approach.

Given a partition PiR(i.e., Ri) of R, we bound the distance of the kNN for all objects of PiRat a time based on TRand TS, which we have as a byproduct of the first MapReduce.







Q U

U



UQ







Q V

V



VQ

0DS

0DS

0DS

0DS 3

3



5

5





U

U



3

3



5

5





UL

UL



3

3



6

6





V

V



3

3



6

6





VL

VL



,QSXW 2XWSXW

5

6

3

3

















6XPPDU\7DEOHV

3

3















3

3

















3

3









































Figure 4: Partitioning and building the summary tables

THEOREM 3. Given a partition PiR⊂ R, an object s of PjSS, the upper bound distance from s to ∀r ∈ PiR, denoted as ub(s, PiR), is:

ub(s, PiR) = U(PiR) + |pi, pj| + |pj, s| (5) Proof. ∀r ∈ PiR, according to the triangle inequality,|r, pj| ≤

|r, pi|+|pi, pj|. Similarly, |r, s| ≤ |r, pj|+|pj, s|. Hence, |r, s| ≤

|r, pi| + |pi, pj| + |pj, s|. Since r ∈ PiR, according to the defini- tion of U(PiR), |r, pi| ≤ U(PiR). Clearly, we can derive |r, s| ≤ U(PiR) + |pi, pj| + |pj, s| = ub(s, PiR).

Figure 5(a) shows the geometric meaning of ub(s, PiR). Accord- ing to the Equation 5, we can find a set of k objects from S with the smallest upper bound distances as the kNN of all objects in PiR. For ease of exposition, let KNN(PiR, S) be the k objects from S with the smallest ub(s, PiR). Apparently, we can derive a bound (denoted as θithat corresponds to PiR) of the kNN distance for all objects in PiRas follows:

θi= max

∀s∈KNN(PiR,S)|ub(s, PiR)|. (6) Clearly,∀r ∈ PiR, the distance from r to any object of KNN(r, S) is less than or equal to θi. Hence, we are able to bound the distance of the kNN for all objects of PiRat a time. Moreover, according to the Equation 5, we can also observe that in each partition PiS, k objects with the smallest distances to pimay contribute to refine KNN(PiR, S) while the remainder cannot. Hence, we only main- tain k smallest distances of objects from each partition of S to its corresponding pivot in summary table TS(shown in Figure 3).

Algorithm 1: boundingKNN(PiR)

1 create a priority queue P Q;

2 foreach PjSdo

3 foreach s ∈ KNN(pj, PjS) do /* set in TS */

4 ub(s, PiR) ← U(PiR) + |pi, pj| + |s, pj|;

5 if P Q.size < k then P Q.add(ub(s, PiR));

6 else if P Q.top > dist then

7 P Q.remove(); P Q.add(ub(s, PiR));

8 else break;

9 return P Q.top;

Algorithm 1 shows the details on how to compute θi. We first create a priority queue P Q with size k (line 1). For partition PjS, we compute ub(s, PiR) for each s ∈ KNN(pj, PjS), where

(6)

3L5

8

3L 3M

 V 3L5 XE

 SL SM

+3 V

(a) upper bound

3L 3M

 V 3L5 OE

3L5

8

V

 SLSM +3

(b) lower bound Figure 5: Bounding k nearest neighbors

|s, pj| is maintained in TS. To speed up the computation of θi, we maintain|s, pj| in TSbased on the ascending order. Hence, when ub(s, PiR) ≥ P Q.top, we can guarantee that no remaining objects in KNN(pj, PjS) help refine θi(line 8). Finally, we return the top of P Q which is taken as θi(line 9).

4.3.2 Finding

Si

for

Ri

Similarly to Theorem 3, we can derive the lower bound distance from an object s ∈ PjSto any object of PiRas follows.

THEOREM 4. Given a partition PiR, an object s of PjS, the low- er bound distance from s to ∀r ∈ PiR, denoted by lb(s, PiR), is:

lb(s, PiR) = max{0, |pi, pj| − U(PiR) − |s, pj|} (7) PROOF. ∀r ∈ PiR, according to the triangle inequality,|r, pj| ≥

|pj, pi| − |pi, r|. Similarly, |r, s| ≥ |r, pj| − |pj, s|. Hence,

|r, s| ≥ |pj, pi| − |pi, r| − |pj, s|. Since r ∈ PiR, according to the definition of U(PiR), |r, pi| ≤ U(PiR). Thus we can derive

|r, s| ≥ |pi, pj| − U(PiR) − |s, pj|. As the distance between any two objects is not less than 0, the low bound distance lb(s, PiR) is set tomax{0, |pi, pj| − U(PiR) − |s, pj|}

Figure 5(b) shows the geometric meaning of lb(s, PiR). Clearly,

∀s ∈ S, if we can verify lb(s, PiR) > θi, then s cannot be one of KNN(r, S) for any r ∈ PiRand s is safe to be pruned. Hence, it is easy for us to verify whether an object s ∈ S needs to be assigned to Si.

THEOREM 5. Given a partition PiR and an object s ∈ S, the necessary condition that s is assigned to Siis that: lb(s, PiR) ≤ θi. According to Theorem 5,∀s ∈ S, by computing lb(s, PiR) for all PiR ⊂ R, we can derive all Sithat s is assigned to. However, when the number of partitions for R is large, this computation cost might increase significantly since∀s ∈ PjS, we need to compute

|pi, pj|. To cope with this problem, we propose Corollary 2 to find all Siwhich s is assigned to only based on |s, pj|.

COROLLARY 2. Given a partition PiRand a partition PjS,∀s ∈ PjS, the necessary condition that s is assigned to Siis that:

|s, pj| ≥ LB(PjS, PiR), (8) where LB(PjS, PiR) = |pi, pj| − U(PiR) − θi.

PROOF. The conclusion directly follows Theorem 5 and Equa- tion 7.

According to Corollary 2, for partition PjS, objects exactly lying in region[LB(PjS, PiR), U(PjS)] are assigned to Si. Algorithm 2 shows how to compute LB(PjS, PiR), which is self-explained.

4.3.3

k

NN Join between

Ri

and

Si

As a summary, Algorithm 3 describes the details of kNN join procedure that is described in the second MapReduce job. Before launching map function, we first compute LB(PjS, PiR) for every

Algorithm 2: compLBOfReplica()

1 foreach PiRdo

2 θi← boundingKNN (PiR);

3 foreach PjSdo

4 foreach PiRdo

5 LB(PjS, PiR) ← |pi, pj| − U(PiR) − θi;

Algorithm 3: kNN join

1map-setup /* before running map function */

2 compLBOfReplica();

3map (k1,v1)

4 if k1.dataset = R then

5 pid ← getPartitionID(k1.partition);

6 output(pid, (k1, v1));

7 else

8 PjS← k1.partition;

9 foreach PiRdo

10 if LB(PjS, PiR) ≤ k1.dist then

11 output(i, (k1, v1));

12reduce (k2,v2) /* at the reducing phase */

13 parse PiRand Si(PjS1, . . . , PjSM) from(k2, v2);

14 sort PjS1, . . . , PjSMbased on the ascending order of

|pi, pjl|;

15 compute θi← max∀s∈KNN(PR

i ,S)|ub(s, PiR)|;

16 for r ∈ PiRdo

17 θ ← θi; KNN(r, S) ← ∅;

18 for j ← j1to jMdo

19 if PjScan be pruned by Corollary 1 then

20 continue;

21 foreach s ∈ PjSdo

22 if s is not pruned by Theorem 2 then

23 refine KNN(r, S) by s;

24 θ ← max∀o∈KNN(r,S){|o, r|};

25 output(r,KNN(r, S));

PjS(line 1–2). For each object r ∈ R, the map function generates a new key value pair in which the key is its partition id, and the value consists of k1 and v1 (line 4–6). For each object s ∈ S, the map function creates a set of new key value pairs, if not pruned based on Corollary 2 (line 7–11).

In this way, objects in each partition of R and their potential k nearest neighbors will be sent to the same reducer. By parsing the key value pair (k2, v2), the reducer can derive the partition PiRand subset Sithat consists of PjS1, . . . , PjSM(line 13), and compute the kNN of objects in partition PiR(line 16–25).

∀r ∈ PiR, in order to reduce the number of distance compu- tations, we first sort the partitions from Si by the distances from their pivots to pivot pi in the ascending order (line 14). This is based on the fact that if a pivot is near to pi, then its correspond- ing partition often has higher probability of containing objects that are closer to r. In this way, we can derive a tighter bound dis- tance of kNN for every object of PiR, leading to a higher prun- ing power. Based on Equation 6, we can derive a bound of the

(7)

kNN distance, θi, for all objects of PiR. Hence, we can issue a range search with query r and threshold θiover dataset Si. First, KNN(r, S) is set to empty (line 17). Then, all partitions PjSare checked one by one (line 18–24). For each partition PjS, based on Corollary 1, if d(r, HP (pi, pj)) > θ, no objects in PjScan help refine KNN(r, S), and we proceed to check the next partition di- rectly (line 19–20). Otherwise,∀s ∈ PjS, if s cannot be pruned by Theorem 2, we need to compute the distance|r, s|. If |r, s| < θ, KNN(r, S) is updated with s and θ is updated accordingly (lines 22–24). After checking all partitions of Si, the reducer outputs KNN(r, S) (line 25).

5. MINIMIZING REPLICATION OF S

By bounding the k nearest neighbors for all objects in partition PiR, according to Corollary 2,∀s ∈ PjS, we assign s to Siwhen

|s, pj| ≥ LB(PjS, PiR). Apparently, to minimize the number of replicas of objects in S, we expect to find a large LB(PjS, PiR) while keeping a small|s, pj|. Intuitively, by selecting a larger num- ber of pivots, we can split the dataset into a set of Voronoi cells (corresponding to partitions) with finer granularity and the bound of the kNN distance for all objects in each partition of R will become tighter. This observation is able to be confirmed by Equation 8. By enlarging the number of pivots, each object from R ∪ S is able to be assigned to a pivot with a smaller distance, which reduces both

|s, pj| and the upper bound U(PiR) for each partition PiRwhile a smaller U(PiR) can help achieve a larger LB(PjS, PiR). Hence, in order to minimize the replicas of objects in S, it is required to se- lect a larger number of pivots. However, in this way, it might not be practical to provide a single reducer to handle each partition PiR. To cope with this problem, a natural idea is to divide partitions of R into disjoint groups, and take each group as Ri. In this way, Si

needs to be refined accordingly.

5.1 Cost Model

By default, let R =

1≤i≤NGi, where Giis a group consisting of a set of partitions of R and Gi∩ Gj= ∅, i = j.

THEOREM 6. Given partition PjSand group Gi,∀s ∈ PjS, the necessary condition that s is assigned to Siis:

|s, pj| ≥ LB(PjS, Gi), (9) where LB(PjS, Gi) = min∀PR

i ∈GiLB(PjS, PiR).

PROOF. According to Corollary 2, s is assigned to Sias long as there exists a partition PiR∈ Giwith|s, pj| ≥ LB(PjS, PiR).

By computing LB(PjS, Gi) in advance for each partition PjS, we can derive all Sifor each s ∈ PjSonly based on|s, pj|. Ap- parently, the average number of replicas of objects in S is reduced since duplicates in Siare eliminated. According to Theorem 6, we can easily derive the number of all replicas (denoted as RP (S)) as follows.

THEOREM 7. The number of replicas of objects in S that are distributed to reducers is:

RP (S) =

∀Gi



∀PjS

|{s|s ∈ PjS∧ |s, pj| ≥ LB(PjS, Gi)}| (10)

5.2 Grouping Strategies

We present two strategies for grouping partitions of R to approx- imately minimize RP (S).

Algorithm 4: geoGrouping()

1 select pisuch that

pj∈P|pi, pj| is maximized;

2 τ ← {pi}; G1← {PiR}; P ← P − {pi};

3 for i ← 2 to N do

4 select pl∈ P such that

pj∈τ|pl, pj| is maximized;

5 Gi← {PlR}; P ← P − {pl};τ ← τ ∪ {pl};

6 whileP = ∅ do

7 select group Giwith the smallest number of objects;

8 select pl∈ P such that

∀PjR⊂Gi|pl, pj| is minimized;

9 Gi← Gi∪ {PlR}; P ← P − {pl};

10 return{G1, G2, . . . , GN}

5.2.1 Geometric Grouping

Geometric grouping is based on an important observation: given two partitions PiRand PjS, if pjis far away from picompared with the remaining pivots, then PjSis deemed to have a low possibility of containing objects as any of kNN for objects in PiR. This ob- servation can be confirmed in Figure 1 where partition P5does not have objects to be taken as any of kNN of objects in P2. Hence, a natural idea to divide partitions of R is that we make the parti- tions, whose corresponding pivots are near to each other, into the same group. In this way, regarding group Gi, objects of partitions from S that are far away from partitions of Giwill have a large possibility to be pruned.

Algorithm 4 shows the details of geometric grouping. We first select the pivot piwith the farthest distance to all the other pivots (line 1) and assign partition PiR to group G1 (line 2). We then sequentially assign a partition to the remaining groups as follows:

for group Gi (2 ≤ i ≤ N), we compute the pivot plwhich has the farthest distance to the selected pivots (τ) and assign PlRto Gi

(line 3–5). In this way, we can guarantee that the distance among all groups are the farthest at the initial phase. After assigning the first partition for each group, in order to balance the workload, we do the following iteration until all partitions are assigned to the groups: (1) select the group Giwith the smallest number of objects (line 7); (2) compute the pivot plwith the minimum distance to the pivots of Gi, and assign PlRto Gi(line 8–9). In this way, we can achieve that the number of objects in each group is nearly the same.

Finally, we return all groups that maintain partitions of R (line 10).

5.2.2 Greedy Grouping

Let RP (S, Gi) be the set of objects from S that need to be as- signed to Si. The objective of greedy grouping is to minimize the size of RP (S, Gi∪ {PjR}) − RP (S, Gi) when assigning a new partition PjRto Gi. According to Theorem 6, RP (S, Gi) is able to be formally quantified as:

RP (S, Gi) = 

∀PjS⊂S

{s|s ∈ PjS∧ |s, pj| ≥ LB(PjS, Gi)} (11)

Hence, theoretically, when implementing the greedy grouping ap- proach, we can achieve the optimization objective by minimizing RP (S, Gi∪ {PjR}) − RP (S, Gi) instead of

PjR∈Gi|pi, pj| in the geometric grouping approach. However, it is rather costly to select a partition PjRfrom all remaining partitions with minimum RP (S, Gi∪ {PjR}) − RP (S, Gi). This is because by adding a new partition PjRto Gi, we need to count the number of emerging objects from S that are distributed to the Si. Hence, to reduce the computation cost, once∃s ∈ PlS,|s, pj| ≤ LB(PjS, Gi), we add

(8)

all objects of partition PlSto RP (S, Gi), i.e., the RP (S, Gi) is approximately quantified as:

RP (S, Gi) ≈ 

∀PjS⊂S

{PjS|LB(PjS, Gi) ≤ U(PjS)} (12)

Remark: To answer kNN join by exploiting the grouping strate- gies, we use the group id as the key of the Map output. We omit the details which are basically the same as described in Algorithm 3.

6. EXPERIMENTAL EVALUATION

We evaluate the performance of the proposed algorithms on our in-house cluster, Awan4. The cluster includes 72 computing n- odes, each of which has one Intel X3430 2.4GHz processor, 8G- B of memory, two 500GB SATA hard disks and gigabit ethernet.

On each node, we install CentOS 5.5 operating system, Java 1.6.0 with a 64-bit server VM, and Hadoop 0.20.2. All the nodes are connected via three high-speed switches. To adapt the Hadoop en- vironment to our application, we make the following changes to the default Hadoop configurations: (1) the replication factor is set to 1;

(2) each node is configured to run one map and one reduce task. (3) the size of virtual memory for each map and reduce task is set to 4GB.

We evaluate the following approaches in the experiments.

• H-BRJ is proposed in [21] and described in Section 3. In par- ticular, to speed up the computation of Ri Sj, it employs R-tree to index objects of Sjand finds kNN for ∀r ∈ Riby traversing the R-tree. We used the implementation generous- ly provided by the authors;

• PGBJ is our proposed kNN join algorithm that utilizes the partitioning and grouping strategy;

• PBJ is also our proposed kNN join algorithm. The only dif- ference between PBJ and PGBJ is that PBJ does not have the grouping part. Instead, it employs the same framework used in H-BRJ. Hence, it also requires an extra MapReduce job to merge the final results.

We conduct the experiments using self-join on the following datasets:

• Forest FCoverType5(Forest for short): This is a real dataset that predicts forest cover type from cartographic variables.

There are 580K objects, each with 54 attributes (10 integer, 44 binary). We use 10 integer attributes in the experiments.

• Expanded Forest FCoverType dataset: To evaluate the per- formance on large datasets, we increase the size of Forest while maintaining the same distribution of values over the dimensions of objects (like [16]). We generate new objects in the way as follows: (1) we first compute the frequencies of values in each dimension, and sort values in the ascending order of their frequencies; (2) for each object o in the original dataset, we create a new object¯o, where in each dimension Di,¯o[i] is ranked next to o[i] in the sorted list. Further, to create multiple new objects based on object o, we replace o[i] with a set of values next to it in the sorted list for Di. In particular, if o[i] is the last value in the list for Di, we keep this value constant. We build Expanded Forest FCoverType dataset by increasing the size of Forest dataset from 5 to 25 times. We use “Forest×t” to denote the increased dataset where t ∈ [5, 25] is the increase factor.

4http://awan.ddns.comp.nus.edu.sg/ganglia/

5http://archive.ics.uci.edu/ml/datasets/Covertype

• OpenStreetMap6(OSM for short): this is a real map dataset containing the location and description of objects. We ex- tract 10 million records from this dataset, where each record consists of 2 real values (longitude and latitude) and a de- scription with variable length.

By default, we evaluate the performance of kNN join (k is set to 10) on the “Forest×10” dataset using 36 computing nodes. We measure several parameters, including query time, distance com- putation selectivity, and shuffling cost. The distance computation selectivity (computation selectivity for short) is computed as fol- lows:

# of object pairs to be computed

|R| × |S| , (13)

where the objects also include the pivots in our case.

6.1 Study of Parameters of Our Techniques

We study the parameters of PGBJ with respect to pivot selec- tion strategy, pivot number, and grouping strategy. By combining different pivot selection and grouping strategies, we obtain 6 strate- gies, which are: (1) RGE, random selection + geometric grouping;

(2) FGE, farthest selection + geometric grouping; (3) KGE, k- means selection + geometric grouping; (4) RGR, random selection + greedy grouping; (5)FGR, farthest selection + greedy grouping;

(6) KGR, k-means selection + greedy grouping.

6.1.1 Effect of Pivot Selection Strategies

Table 2 shows the statistics of partition sizes using different piv- ot selection strategies including random selection, farthest selec- tion and k-means selection. We observe that the standard deviation (dev.for short) of partition size drops rapidly when the number of pivots increases. Compared to random selection and k-means selection, partition size varies significantly in the farthest selection.

The reason is that in the farthest selection, outliers are always s- elected as pivots. Partitions corresponding to these pivots contain few objects, while other partitions whose pivots reside in dense ar- eas contain a large number of objects. Specifically, when we select 2000 pivots using farthest selection, the maximal partition size is 1,130,678, which is about 1/5 of the dataset size. This large dif- ference in partition size will degrade performance due to the unbal- anced workload. We also investigate the group size using geometric grouping approach7. As shown in Table 3, the number of objects in each group varies significantly using the farthest selection. A- gain, this destroys the load balance since each reducer needs to perform significantly different volume of computations. However, the group sizes using random selection and k-means selection are approximately the same.

Figure 6 shows the execution time for various phases in kNN join. We do not provide the execution time for farthest selection be- cause it takes more than 10,000s to answer kNN join. The reason of the poor performance is: almost all the partitions of S overlap with large-size partitions of R. Namely, we need to compute distances for a large number of object pairs. Comparing RGE with KGE, and RGR with KGR in Figure 6, we observe that the overall per- formance using random selection is better than that using k-means selection. Further, when the number of pivots increases, the gap of the overall performance becomes larger. This is because k-means selection involves a large number of distance computations, which results in large execution time. Things get worse when k increases.

6http://www.openstreetmap.org

7We omit the results for greedy grouping as they follows the same trend.

數據

Table 1: Symbols and their meanings Symbol Definition
Figure 1: An example of data partitioning
Figure 3: An overview of kNN join in MapReduce This approach avoids replication on the set R and sending the
Figure 4: Partitioning and building the summary tables
+3

參考文獻

相關文件