• 沒有找到結果。

Creating a recommendation engine

在文檔中 Storm Real-time (頁 195-200)

A recommendation engine makes intelligent guesses as to what a customer may want to buy based on previous lists of products, which has been made famous by leaders such as Amazon. These lists may be from a current selection within the context of the current session.

The list of products may be from previous purchases by the particular customer, and it may even simply be the products that the customer has viewed within a given session. Whichever approach you choose, the training data and scoring data during operational phases must follow the same principles.

In this recipe, we will use the association rules model from the previous recipe to create a recommendation engine. The concept behind the engine is that lists are supplied as asynchronous inputs and recommendations are forwarded as asynchronous outputs where applicable.

There are product combinations that aren't strongly supported by the model;

in these cases, no recommendation is emitted. If you need a recommendation for every single input, you could choose to emit a random recommendation when there is no strongly supported recommendation, or you could choose to improve your model through better and generally larger training datasets.

How to do it…

1. Start by creating a Maven project called arules-topology and add the following dependencies:

<dependency>

<groupId>com.github.quintona</groupId>

<artifactId>trident-kafka-push</artifactId>

<version>1.0-SNAPSHOT</version>

</dependency>

<dependency>

<groupId>storm</groupId>

<artifactId>storm-kafka</artifactId>

<version>0.9.0-wip16b-scala292</version>

</dependency>

<dependency>

<groupId>com.github.quintona</groupId>

<artifactId>storm-r</artifactId>

<version>0.0.1-SNAPSHOT</version>

</dependency>

2. Next, create a main topology class called RecommendationTopology using the idiomatic Storm main method. For this recipe, we will be receiving the product list as a JSON array on a Kafka topic. We will therefore need to coerce the byte array input into a tuple containing two separate values, one being the transaction ID and the other being the list of products, as shown in the following lines of code:

public static class CoerceInFunction extends BaseFunction { @Override

List<String> values = new ArrayList<String>(array.

3. We will also need to publish the output message using the Kafka partition persist.

The recommendation and transaction ID need to be coerced into a single value consisting of a JSON array as follows:

public static class CoerceOutFunction extends BaseFunction { @Override

public void execute(TridentTuple tuple, TridentCollector collector) {

JSONObject obj = new JSONObject();

obj.put("transaction-id", tuple

.getStringByField("transaction-id"));

obj.put("recommendation", tuple.getStringByField(

"recommendation"));

collector.emit(new Values(obj.toJSONString()));

} }

4. We then need to define the topology as described here:

topology.newStream("kafka",

new TransactionalTridentKafkaSpout(spoutConfig)) .each(new Fields("bytes"), new

new KafkaState.Options()), new Fields("message"),

new KafkaStateUpdater("message"), new Fields());

5. The Storm-R project's standard function supports only a known input array size. This works for most use cases; however, for the association case, the input size will vary for each tuple. It is therefore necessary to override the execute function to cater for this particular case as shown here:

public static class ListRFunction extends RFunction { public ListRFunction(List<String> libraries, String functionName) {

super(libraries, functionName);

}

@Override

public void execute(TridentTuple tuple, TridentCollector collector) {

List<String> items = (List<String>) tuple.get(0);

JSONArray functionInput = new JSONArray();

functionInput.addAll(items);

JSONArray result = performFunction(functionInput);

if(result != null)

collector.emit(coerceResponce(result));

} }

6. These elements are all that is required to create the recommendation engine. You can now start your topology in local mode from Eclipse. In order to test it, a test script is provided with the chapter code bundle named sendSelection.py. This takes a single parameter, which is the number of transactions, to publish onto the queue as follows:

python sendSelection.py 1000

7. You can view the output recommendations by issuing the following command from the Kafka command line:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic recommendation-output --from-beginning

How it works...

At the fundamental level, Storm-R works in a very similar way to the multilang approach that we investigated in detail in Chapter 5, Polyglot Topology. For each instance of the function, it creates an R process that it interacts with via Studio. However, Storm-R cuts much of the complexity and overhead out because it doesn't make use of a generalized protocol; it is specific to R. This makes it much faster and simpler.

The key assumption behind Storm-R is that a single R function will be called from the Storm function and will pass a vector as input. The R function will return a vector as output. This assumption nicely aligns the function interface with the concept of a tuple. The Storm-R module further simplifies the R implementation by auto-generating some code to take care of the marshaling and unmarshaling of the vector across the interface.

As with a standard Trident function, any lengthy functions should execute during the

preparation phase of the Storm function. The Storm-R function expects to be initialized with a function name and a list of R libraries to load during the preparation phase. It can also take an R script that should be executed during the preparation phase.

The R function that is to be called needs to be available within the R session. This means that the R function needs to be supplied in a prebuilt package, or it can be defined in the initialization script. In this recipe, we are using a function that is defined in the initialization script, but specifically we are using a prepackaged initialization script that ships with Storm-R.

Let's unpack the R function definition within the topology with the following code snippet:

.each(new Fields("current-list"), new ListRFunction(Arrays.asList(new String[] { "arules" }), "recommend")

.withNamedInitCode("recommend"), new Fields("recommendation")) Firstly, the constructor receives a list of libraries to load into R; in this case, just the arules package. The constructor also receives a function name called recommend. This is the R function that will receive a vector input and must return a vector output. Secondly, the withNamedInitCode method is called and the name recommend is supplied. What this function does is look up a script on the classpath by appending .R to the name supplied.

Therefore, in this case, a script named /recommend.R is present at the root of the classpath.

This means that you can supply any script there, but in this case, we will use the one that Storm-R brings to the classpath. There is also an overloaded method that allows you to supply the script's contents as a string.

Let's take a look at the contents of the recommend script as shown here:

data("Groceries")

rules <- apriori(Groceries, parameter = list(supp = 0.001, conf = 0.8) recommend <- function(list){

rules.found <- subset(rules, subset = lhs %ain% list & lift > 1.3) as(rhs(rules.found), "list")

}

You will notice some things from the previous recipe. Essentially, this script is loading the dataset and building the rules from it. It is then defining a function called recommend that takes a vector as input and uses it to search the rules for any found matches where the lift is greater than 1.3 and the LHS is the supplied input list. It then converts the RHS of the found rules into a vector and returns.

There are three ways to match using the subset function: Any, All, or Partial. This example uses All (%ain%); however, you could choose a less script-matching criteria such as Any(%in%) or Partial (%pin%).

The net effect is that at the time of Trident function preparation, the library will be loaded, and this function will be defined, making it available for all subsequent calls in the session. All subsequent calls will originate from the execute method and contain the values from tuple.

In this recipe, we overrode the execute method of RFunction. The reason, as stated, is that the standard implementation expects a fixed set of values in the tuple. This becomes more apparent when you compare the implementations. The execute method from RFunction consists of the following code:

JSONArray functionInput = coerceTuple(tuple);

JSONArray result = performFunction(functionInput);

if(result != null)

collector.emit(coerceResponce(result));

Whereas the overridden version consists of the following code:

List<String> items = (List<String>) tuple.get(0);

JSONArray functionInput = new JSONArray();

functionInput.addAll(items);

JSONArray result = performFunction(functionInput);

if(result != null)

collector.emit(coerceResponce(result));

As you can see, the difference is subtle but important. The first instance uses the values of the tuple as the input to the R function. The second instance uses the content of one of the values of the tuple, which is an array itself. In this way, we can support any length of products.

在文檔中 Storm Real-time (頁 195-200)