• 沒有找到結果。

Registering Workers

} };

We don’t have any data to put in these znodes, so we are just passing an empty byte array.

Because of that, we don’t have to worry about keeping track of the data that corresponds to each znode, but often there is data unique to a path, so we will track the data using the callback context of each create call. It may seem a bit strange that we pass data in both the second and fourth parameters of create, but the data passed in the second parameter is the data to be written to the new znode and the data passed in the fourth will be made available to the create ParentCallback.

If the callback gets a CONNECTIONLOSS return code, we want to simply retry the create, which we can do by calling createPath. However, to call createPath we need the data that was used in the original create. We have that data in the ctx object that was passed to the callback because we passed the creation data as the fourth parameter of the create. Because the context object is separate from the callback object, we can use a single callback object for all of the creates.

In this example you will notice that there isn’t any difference between a file (a znode that contains data) and a directory (a znode that contains children). Every znode can have both.

Registering Workers

Now that we have a master, we need to set up the workers so that the master has someone to boss around. According to our design, each worker is going to create an ephemeral znode under /workers. We can do this quite simply with the following code. We will use the data in the znode to indicate the state of the worker:

import java.util.*;

import org.apache.zookeeper.AsyncCallback.DataCallback;

import org.apache.zookeeper.AsyncCallback.StringCallback;

import org.apache.zookeeper.AsyncCallback.VoidCallback;

import org.apache.zookeeper.*;

private static final Logger LOG = LoggerFactory.getLogger(Worker.class);

ZooKeeper zk;

String hostPort;

String serverId = Integer.toHexString(random.nextInt());

Worker(String hostPort) { this.hostPort = hostPort;

}

void startZK() throws IOException {

zk = new ZooKeeper(hostPort, 15000, this);

}

public void process(WatchedEvent e) {

LOG.info(e.toString() + ", " + hostPort);

}

void register() {

zk.create("/workers/worker-" + serverId, "Idle".getBytes(),

Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, createWorkerCallback, null);

}

StringCallback createWorkerCallback = new StringCallback() { public void processResult(int rc, String path, Object ctx,

} };

public static void main(String args[]) throws Exception { Worker w = new Worker(args[0]);

We will be putting the status of the worker in the data of the znode that represents the worker.

If the process dies we want the znode representing the worker to get cleaned up, so we use the EPHEMERAL flag. That means that we can simply look at the children of /workers to get the list of available workers.

Because this process is the only one that creates the ephemeral znode representing the process, if there is a connection loss during the creation of the znode, it can simply retry the creation.

As we have seen earlier, because we are registering an ephemeral node, if the worker dies the registered znode representing that node will go away. So this is all we need to do on the worker’s side for group membership.

We are also putting status information in the znode representing the worker. This allows us to check the status of the worker by simply querying ZooKeeper. Currently, we have only the initializing and idle statuses; however, once the worker starts actually doing things, we will want to set other status information.

Here is our implementation of setStatus. This method works a little bit differently from methods we have seen before. We want to be able to set the status asynchronously so that it doesn’t delay regular processing:

StatCallback statusUpdateCallback = new StatCallback() {

public void processResult(int rc, String path, Object ctx, Stat stat) {

synchronized private void updateStatus(String status) { if (status == this.status) {

zk.setData("/workers/" + name, status.getBytes(), -1, statusUpdateCallback, status);

} }

public void setStatus(String status) { this.status = status;

updateStatus(status);

}

We save our status locally in case a status update fails and we need to retry.

Rather than doing the update in setStatus, we create an updateStatus method that we can use in setStatus and in the retry logic.

There is a subtle problem with asynchronous requests that we retry on connection loss: things may get out of order. ZooKeeper is very good about maintaining order for both requests and responses, but a connection loss makes a gap in that ordering, because we are creating a new request. So, before we requeue a status update, we need to make sure that we are requeuing the current status; otherwise, we just drop it. We do this check and retry in a synchronized block.

We do an unconditional update (the third parameter; the expected version is –1, so version checking is disabled), and we pass the status we are setting as the context object.

If we get a connection loss event, we simply need to call updateStatus with the status we are updating. (We passed the status in the context parameter of set Data.) The updateStatus method will do checks for race conditions, so we do not need to do those here.

To understand the problems with reissuing operations on a connection loss a bit more, consider the following scenario:

1. The worker starts working on task-1, so it sets the status to working on task-1.

2. The client library tries to issue the setData, but encounters networking problems.

3. After the client library determines that the connection has been lost with ZooKeeper and before statusUpdateCallback is called, the worker finishes task-1 and be‐

comes idle.

4. The worker asks the client library to issue a setData with Idle as the data.

5. Then the client processes the connection lost event; if updateStatus does not check the current status, it would then issue a setData with working on task-1.

6. When the connection to ZooKeeper is reestablished, the client library faithfully issues the two setData operations in order, which means that the final state would be working on task-1.

By checking the current status before reissuing the setData in the updateStatus meth‐

od, we avoid this scenario.

Order and ConnectionLossException

ZooKeeper is very strict about maintaining order and has very strong ordering guarantees. However, care should be taken when thinking about ordering in the presence of multiple threads. One common sce‐

nario where multiple threads can cause errors involves retry logic in callbacks. When reissuing a request due to a ConnectionLossExcep tion, a new request is created and may therefore be ordered after requests issued on other threads that occurred after the original request.