Chapter 2 Related Work
2.5 Kademlia DHT
To store the information of the cached contents distributedly, a distributed hash table (DHT) is used. We adopted Kademlia for the purpose. Kademlia is a DHT system based on XOR metric. Each Kademlia node has a 160-bit identifier; each node chooses its identifier at random when joining the system. The keys used for the hash table mapping are also 160-bit identifiers, which we use SHA-1 hash function on the name of wanted file to generate. Given two identifiers x and y, the distance between them is the bitwise XOR result interpreted as an integer. The detailed operation will not be described here, but two major functions used in our system are PUT<key, value> and GET<key>. PUT<key, value> function stores the <key, value> pair on K nodes closest to the key, where K is a system parameter that can be adjusted. The GET<key> function retrieves the value associated with the given id, i.e., PUT<key, value> had been performed.
Chapter 3
System Design & Implementation
P2P time-shift streaming is similar to P2P VoD services, except that the size and duration of a VoD program can be pre-calculated, while those of time-shift video streaming can’t be done in advance. Therefore, caching mechanism is the major issue in the system.
3.1 System overview
By studying related research on P2P live streaming and P2P VoD streaming systems, we conclude that our system needs to cope with following issues: live streaming, content caching, publishing, searching and fetching, which we sorted to three major topics:
1. Live streaming framework
Live streaming framework provides a basis for this system, as time-shift streaming contents are provided by the contents that live streaming viewers had watched and cached in their local storage. The details of the applied live streaming framework based on DONet/Coolstreaming had been presented in Section 2.4.
2. Caching strategy and cache replacement policy
Live streaming nodes cache the contents they had watched to support time-shift streaming nodes, and thus the caching strategy is an important issue of the system design. Two factors, cached data redundancy and time-shift service span, were considered. It is clear that having all live streaming nodes caching all the contents they had watched provides the most data redundancy, but the shortest service span, because each node only has a limited storage space, and the time-shift service span hat a single node can provide is equal to the node’s storage space. On the other hand, having only
one replica in the system provides a storage space equal to the sum of all nodes’ storage space, but this provides poor data redundancy since the departure or failure of a node means the lose of data. Therefore, a mechanism keeping a balance between them is important, and thus we propose a probability algorithm to keep a desired number of replicas in the system.
3. Time-shift content search/fetching mechanism
The cached content must be located before it can be retrieved, we adopted Kademlia [17-18] distributed hash table (DHT) for content publishing and content search. With the published knowledge collected from the DHT, time-shift contents can be fetched from multiple sources in an efficient and load-balancing manner.
3.2 System architecture
Figure 3-1 System Architecture
Figure 3-1 depicts the architecture of our system. The system contains three types of components: bootstrap server, provider and viewer. The bootstrap server maintains a list of available channels and a list of participating nodes of each channel, in order to bootstrap the newly joined nodes. A provider is also a source node in the live streaming network and it registers its providing channel’s information with the bootstrap server.
Viewers first join the system with the help of the bootstrap server, and then retrieves the desired video contents for live streaming playback or time-shift playback.
Figure 3-2 System diagram of a node
Figure 3-2 depicts the system diagram of a node; the node can be a channel provider or viewer. The player-buffer relationship depends on the type of the node. For a provider, the player encodes the original video stream in to packet stream, and the stream data is put in its buffer for data transmission and genertaing its buffering status.
For a viewer, the video content is also put in its buffer for data transmission, generating buffering status and playback. To share video content among peers, the buffered content
can be transmitted through either live streaming mechanism or time-shift streaming mechanism. The live streaming part handles the content transmission for live streaming, and cooperates with the time-shift streaming part to cache and publish the contents.
Transmissions are carried out on TCP connections to avoid network layer data losses.
Kademlia DHT is used to published the cahed content, and its messagesare transmitted over UDP packets.
3.3 Transmission unit in streaming
The basic streaming flow of our system is depicted in Figure 3-3. The video stream is generated from the video source. A video server encodes the video stream into continuous packets and transmits to the viewer nodes. Each viewer node receives the video packets, and the video player decodes the received packets back to video. It is intuitive to replay the packets using a buffer-then-play scheme for both live and time-shift P2P streaming. However, since packet encoding is synchronized with the video, each generated packet will have its corresponding position on the timeline. The packet receiving pattern also needs be recorded for packet replay to rebuild the original video content, and thus each packet requires extra timing information. In our system, we record the duration of each packet, so the relative receiving pattern can be rebuilt.
Figure 3-3 Basic streaming
At the video source, the video is encoded into UDP packets by a VLC media player [19]. The UDP packets are then sent to the video server, which is a Provider node, via local loopback interface. The video server measures each packet’s duration. Since it is inefficient to track each packet individually, continuous packets received in a second are
packed into a block used in the system. Furthermore, in order to support time-shift streaming, 10 consecutive blocks, with the starting block’s timestamp is aligned to 10’s multiples, are packed into a file for local storage purpose. The file is named after the information given by the channel provider, with a readable format of timestamp; for example, file with name “ProviderName_Channel1_20100620182520” stands for 10 blocks of Channel 1 provided by ProviderName, with timestamp from 2010/06/20 18:25:20 to 2010/06/20 18:25:29. Figure 3-4 shows the structures of a block and a file.
Figure 3-4 The structures of a block and a file
3.4 Distributed cache management strategy
The goal of our distributed cache management strategy is to effectively keep a desired number of replicas for the cached contents. The strategy is composed of two parts: publishing/re-publishing policy and content caching based on probability.
1. Content publishing/re-publishing policy
After a video file is collected, the node will publish the cached content on the DHT.
However, the provider node works a little differently; it caches all contents but never publishes the ownership information. The purpose is to use the provider node as a backup node, and can only be accessed at emergency. For example, when a block is 5 seconds to the playback deadline but had not been received, or when there is no owner of the wanted content. Since the system will keep multiple replicas for each video file, the published record put into the DHT is a list of <IP, Port, Last_Update_Time> triples.
Fig.3-5 depicts the relation between a file name and its owner list found on the DHT with the structure of the list.
When a node wants to update a list, it first tries to get the list from the DHT. If the
list does not exist, it creates a new list. Then the node removes the record of two types:
(1) the record put by itself in the past that the node will update later, and (2) the out--of-date records that can be determined by comparing the records’
Last_Update_Time with the current time. In our system, we consider a record out-dated if the record is last updated more than thirty minutes ago. This thirty-minute interval could give the node enough time to do multiple updates, which we will mention later in this section. After removing the record , the node checks the size of the list, if the size has reach the desired number of replicas the system, the node deletes its cached file;
otherwise, it add its record to the list, and put the list back to the DHT. However, the accesses of the DHT from the peers are not coordinated, which means a published record may be overwritten by another node. Consider the following scenario. Node A and node B both wants to update the published list for file F. A gets the list, updates the list, and just before A put the list back to the DHT, B also gets the list, and updates the list. After that, A puts the list back to the DHT, but the list will be overwritten when B puts the list back to the DHT. As a result, A’s record is not stored in the list.
File name
Figure 3-5 Getting file owner list from DHT and list structure
To deal the synchronization issue, each node will back-off for a random interval before its publish operation to reduce such collisions, for the first time a node update a list, it will have a random back-off time uniformly distributed in [0, 50), at a 5-second stepping. A node also republishes its cached files. The republish operation is similar to the initial publish operation, but is done periodically in order to keep the lists up to date and to alleviate the effect of missed publishing. A node will periodically do the republish operation with a random back-off time uniformly distributed in [600, 1200), also at a 5-second stepping. As mentioned above, the records on DHT have a thirty
05 viewerKnowledge = MAX(parent.size()+partner.size(), viewerCount);
06 rand = a random integer generated between (0, viewerKnowledge]
07 if(rand < replicasRequired)
08 dump blocks to local storage;
09 random back-off for DHT publish;
10 fileOwnerList = DHT.get(filename);
11 remove out-dated entry and this node’s entry in fileOwnerList 12 if(DHT.get(filename).size() < replicasRequired)
13 FileOwnerList.add(this node);
19 end if 20 end while
Algorithm 3-1 Random Caching and Random Back-off 2. Caching based on probability
To distribute the responsibility of caching streaming contents and keep a desired number of replicas in the system, we adopted a probability algorithm to decide whether a file should be cached or not. Assume that the system wants to keep R replicas, and the system has N viewers. It is clear that each node should cache the received content with a probability of R/N. Since R is a constant, the discovery of N is the issue here.
To estimate N, first, a local knowledge based on the design of DONet/Coolstreaming is used. Since each node keeps connections with its partners and parents, these nodes must be active nodes in the system. Therefore, the node has the first parameter as the value of the number of partners plus the number of parents. In addition, the number of the current active viewers can be obtained by a modified node-startup procedure. When a node joins the system, heartbeat messages are periodically sent to the bootstrap server to update the membership cache, and the number of currently active viewers is piggybacked to the node in the replying messages. With the two values, N is selected as the larger one of the two. The local knowledge helps the node to react fast to the change of active nodes, especially when the size of viewer is small, since they replica control totally based on the content publishing/re-publishing mechanism.
3.5 Time-shift streaming
In time-shift streaming, we applied per-block pulling mechanism for content retrieval. After a node decides which channel it wants to watch, and where it wants to start playback, the name of the file containing the required content is known. By querying with the file name on the DHT, the node obtains the list of file owners. Then, a timer is started and for each interval of 1 second, the node will try to pull up to 4 blocks, each from a randomly selected owner in the list. The reason why there’s a limit on the number pulling blocks in each interval is that the available content cached may be much larger the buffer’s capacity, so that it is required to keep the pulling timestamp stay in a distance with the playback timestamp. For emergency handling, contents close to playback deadline but not received will be pulled directly from the provider.
3.6 System Implementation
We implemented the system in Java 1.6, based on request-reply model: node communicates with each other with request messages, and the recipient will reply with corresponding reply messages.
3.6.1 System Components
1. Bootstrap server
The bootstrap server creates a ServerSocket for incoming messaging connections, Thread’s are created for each incoming connection and received messages are handled and replied to the connecting node.
2. Provider/Viewer
Provider/Viewer node creates two ServerSocket’s, one for incoming messaging connections and the other for block transmission connections, Thread’s are created for each incoming connections. For incoming messaging connections, received messages are handled and replied to the connecting node. And for
incoming block transmission connections, received blocks are then transferred to this node’s buffer, where the block can be played or cached.
3.6.2 Message Format
Message contains its type and required options of that type of message. Fig 3-6 depicts basic message format. After a message has been generate, it is sent through TCP with Java Socket.
Figure 3-6 Message Format
3.6.3 Message Types
(1) Channel Registration
Channel providers registers its information with the bootstrap server, options including this node’s messaging port number, channel provider’s name and channel description. Bootstrap server replies with whether the registration is ok.
(2) Channel List
Viewer requests for available channels registered at bootstrap server, options including this node’s messaging port number, channel provider’s name and channel description. Bootstrap server replies with a list of available channels’ provider name and channel description.
(3) Channel Join
In live streaming, this message is used for channel joining procedure, which we had mentioned the joining procedure in 3.4.1, options including this node’s control port number, channel provider’s name and channel description.
Bootstrap server replies with a list of currently active nodes in the channel.
And in time-shift streaming, the message is used for DHT joining procedure,
where bootstrap server replies a DHT bootstrap node for the DHT bootstrap procedure.
(4) Buffermap Exchange
The message is used for buffer map information exchanges between nodes, options including this node’s control port number and buffer map. The recipient replies with its buffer map.
(5) Sub-stream Subscription
This message is used for sub-stream subscription, the options including this node’s messaging port number, block transmission port number, subscribing timestamp and its buffer map of subscribing sub-stream. The recipient replies with the subscription result.
(6) Sub-stream Un-subscription
This message is used for sub-stream un-subscription, the options including this node’s messaging port number and the index of the un-subscribing sub-stream. The recipient replies with the un-subscription result.
(7) Time-shift Block Request
This message is for time-shift streaming viewer nodes to request a block from other nodes, the options including this node’s messaging port number, block transmission port number and its requesting timestamp. The recipient replies with the requested result and (1) if it has the block, the requested block is sent to the requesting node, or (2) if it does not has the block, it tells the node to ask the server.
Chapter 4
Performance & Analysis
To evaluate the system performance, we performed experiments on PlanetLab, an open global research network [19].
4.1 Experiment Environment
The streaming server is located in the Internet Communication Laboratory, NCTU.
48 PlanetLab nodes were used as live streaming viewers, and 16 PlanetLab nodes were used as time-shift streaming viewers; most of them are located in the United States. The video is streamed at bit rate of 400 kbps, the number of sub-streams was set to be 8, and each node can connect to up to 24 other nodes as partners. The buffer size of each node is 120 blocks The random back-off time of first time publishing was uniformly distributed in [0, 50), at a 5-second stepping. The random back-off for republishing was uniformly distributed in [600, 1200), also at a 5-second stepping, 10 replicas would be kept in the system. Time-shift nodes cache each received block with a probability of 0.5.
Table 4-1 lists the system parameters used in our system.
Table 4-1 System Parameters
System Parameter Value
Video streaming bit-rate 400 kbps
The number of sub-streams 8
The maximum number of partners 24 The number of replicas to keep 10
Buffer size 120 blocks
Random back-off for the first time publishing
0~50 second, stepping 5 seconds
The random back-off for re-publishing 10~20 minutes, stepping 5 seconds
In both trials, we first started the bootstrap server and streaming provider, and then all 64 nodes joined the system as a Poisson process, with the inter-arrival time set to be 60 seconds. For live streaming nodes, after the exchange of block availability information, our heuristic is to set the node’s starting timestamp for playback to be the smallest timestamp in the received availability information plus the number of sub-streams. For each time-shift node, it randomly selected a time between the time when the streaming had started and the time it joined the system to start to playback.
The experiment lasted 2 hours, and we assumed no peer churn. We will examine the performance of both proposed methods: without the information of the number of currently active nodes in the system and with the information of the number of currently active nodes in the system. The results will be placed on each figure’s upper side and lower side, respectively.
4.2 System Performance and Analysis
4.2.1 The live streaming
First, we examine three commonly used criterions in evaluating a streaming service: startup delay, end-to-end delay and playback continuity. The startup delay is the time between when a user tunes to a channel, and when the video content is visible.
End-to-end delay, also called playback delay, is the delay of the video content between the viewer and the source. Continuity index is the number of segments that arrive before or on playback deadlines over the total number segments a node should have received.
Figure 4-1 The Distribution of Startup Delay
Figure 4-2 The Distribution of End-to-End Delay
Figure 4-3 Live Streaming - Continuity Index Diagram
Figure 4-1 depicts the startup delay in our system. The average delay has an
average of 13.19 seconds in the first trial that is without the information of the number of currently active nodes, and 15.75 in the secondtrial that uses the information of the number of currently active nodes. The end-to-end delay, as depicted in Figure 4-2, has an average of 94.33 seconds in the first trial, and 116.46 in the second trial. The continuity index is 99.00% and 98.46%, as shown in fig. 4-3. However, there are two nodes failed to join the system, because they were unable to contact with the bootstrap
average of 13.19 seconds in the first trial that is without the information of the number of currently active nodes, and 15.75 in the secondtrial that uses the information of the number of currently active nodes. The end-to-end delay, as depicted in Figure 4-2, has an average of 94.33 seconds in the first trial, and 116.46 in the second trial. The continuity index is 99.00% and 98.46%, as shown in fig. 4-3. However, there are two nodes failed to join the system, because they were unable to contact with the bootstrap