A Distributed System for Answering Range Queries on Sensor Network Data ∗ Alfredo Cuzzocrea1 , Filippo Furfaro1 , Sergio Greco1 , Elio Masciari2 , Giuseppe M. Mazzeo1 , Domenico Sacc`a2 1: DEIS - Universit`a della Calabria via P. Bucci,
via P. Bucci,
87030 Rende, Italy
87030 Rende, Italy
[email protected]
[email protected]
Abstract A distributed system for approximate query answering on sensor network data is proposed, where a suitable compression technique is exploited to represent data and support query answering. Each node of the system stores either detailed or summarized sensor readings. Query answers are computed by identifying the set of nodes that contain (either compressed or not) data involved in the query, and eventually partitioning the query in a set of sub-queries to be evaluated at different nodes. Queries are partitioned according to a cost model aiming at making the evaluation efficient and guaranteeing the desired degree of accuracy of query answers.
1
Introduction
Sensor networks are non traditional sources of information which cannot be efficiently managed and queried by traditional techniques and architectures [2, 6, 7, 8]. A frequent approach to the problem of representing sensor data and extracting useful information from collected data is based on (lossy) data compression [3, 4]: data are stored in a compressed form, and queries are evaluated on compressed data rather than on original ones. This approach provides approximate query answers, but the reduced amount of information to be processed allows a much more efficient evaluation (which is the main objective in many application contexts). In fact, approximation is often tolerable, especially when considering aggregate queries which involve “old” data. Indeed, a more detailed representation of recent data enables an effective reaction to the world, and, on the other hand, an approximate one for old information often ∗ This
2: ICAR - CNR,
work was supported by a grant from the Italian Research Project FIRB “Enabling ICT Platforms for Distributed High-Performance Computational Grids”, funded by MIUR and coordinated by the National Research Council (CNR).
suffices to generate useful reports. For instance, accuracy of data describing the traffic of a network becomes less relevant in preventing failures as the time interval they refer to becomes older; however, approximate aggregations of older data can be exploited to compute statistics useful for comparative analysis. In [3] a compression technique inspired by this guideline which supports efficient query answering on sensor data streams is presented. In this paper a distributed architecture for storing sensor network data and supporting fast approximate (aggregate) query answering is presented. In this system, data coming from sensors can be stored into one or more nodes. Each node is interested in readings coming from a set of sensors (for instance, sensors belonging to the same geographical region): it stores new data into a traditional temporal DBMS and, at the same time, updates a compressed representation of data, which can be used for fast query answering when a dramatic accuracy is not required. Moreover, each node can store a (compressed) snapshot of data represented into other nodes, which is updated periodically. The adopted compression technique is the one introduced in [3]. The possibility of storing the same information into different nodes of the system (possibly with different compression ratios) improves the efficiency in the query answering: queries submitted by a client, which specifies the desired answer accuracy, can be evaluated by partitioning them into sub-queries and by submitting each sub-query to a different node. The partitioning of the query is performed according to a cost model which takes into account several factors, such as the accuracy of data available into each node involved in the query and his current traffic.
2
Representing Sensor Data
In the rest of the paper we adopt the representation model for sensor data introduced in [3], which is defined as follows. Consider an ordered set of n sources (i.e. sensors)
Proceedings of the 3rd Int’l Conf. on Pervasive Computing and Communications Workshops (PerCom 2005 Workshops) 0-7695-2300-5/05 $20.00 © 2005 IEEE
denoted by {s1 , . . . , sn } producing n independent streams of data, representing sensor readings. Each data stream can be viewed as a sequence of triplets ids , v, ts, where: 1) ids ∈ {1, .., n} is the source identifier; 2) v is a non negative integer value representing the measure produced by the source identified by ids ; 3) ts is a timestamp, i.e. a value that indicates the time when the reading v was produced by the source ids . Instead of representing sensor readings as distinct tuples, a multi-dimensional representation is adopted in order to support data compression and efficient query answering. In particular, the sensor data stream is represented by means of a two-dimensional array, where the first dimension corresponds to the set of sources, and the other one corresponds to time, as shown in Fig. 1. The time dimension is divided into intervals ∆tj of the same size. Each element si , ∆tj of the array is the sum of all the values generated by the source si whose timestamp is within the time interval ∆tj .
We point out that considering the set of sources as an ordered set implies the assumption that the sensors in the network can be organized according to a linear ordering. Whenever any implicit linear order among sources cannot be found (for instance, consider the case that sources are identified by a geographical location), a mapping should be defined between the set of sources and a one-dimensional ordering. This mapping should be closeness-preserving, that is sensors which are “close” in the network should be close in the linear ordering. Obviously, it is not always possible to define a linear ordering such that no information about the “relative” location of every source w.r.t. each other is lost. It can happen that two sources which can be considered as contiguous in the network are not located in contiguous positions according to the linear ordering criterion. In this case, a range query involving a set of contiguous sensors in the network is possibly translated into more than one range query on the linear paradigm used to represent the whole set of sources. Otherwise, when a “meaningful” linear ordering of sensors cannot be found, the representation model can be extended to a three-dimensional setting, where two dimensions are used to locate sensors and the other dimension is used to represent the time.
2.2 Figure 1. Two-dimensional representation of sensor data streams Obviously the use of a time granularity generates a loss of information, as readings of a sensor belonging to the same time interval are aggregated. Indeed, if a time granularity which is appropriate for the particular context monitored by sensors is chosen, the loss of information will be negligible.
2.1
Range queries on data streams
The main goal of this work is to define a distributed architecture for effectively representing sensor readings and efficiently answering aggregate queries issued on sensor data. There are several contexts where solving the problem of aggregating the values produced by a subset of sources within a time interval is crucial for the effectiveness of applications. More formally, this means answering to a range query on the overall stream of data generated by s1 , . . . , sn . A range query is a pair Q = [si ..sj ], [tstart ..tend ] whose answer is the evaluation of an aggregate operator (such as sum, count, avg, etc.) on the values produced by the sources si , si+1 , . . . , sj within the time interval [tstart ..tend ]. An example of range query is reported in Fig. 1.
Querying Compressed Data Stream
As mentioned above, exact answers of aggregate queries are often not necessary, as approximate answers usually suffice to get useful reports on the content of data streams, and to provide a meaningful description of the world monitored by sensors. In [3] a technique is proposed for compressing sensor data streams and providing (fast) approximate answers to aggregate queries on these compressed data. According to this technique, sensor readings are stored into a twodimensional array (which is the physical representation of the model explained above), which is dynamically updated as new readings arrive. In particular, as new data arrive, if the available storage space is not enough for their representation, “old” data are compressed (or possibly removed) to release the storage space needed to represent new readings. The compression of old data is obtained by summarizing them, that is by aggregating readings with contiguous timestamps generated by sensors which are close to one another according to the ordering criterion. The loss of detail in representing old data is progressive: as time goes by, old data are more and more summarized (we start by summarizing readings generated by pairs of sensors, then we summarize readings generated by groups of 4 sensors, and so on). By adopting this strategy, the compressed representation of the data stream consists in an indexed set of values, where each value represents either a single reading or the aggregation
Proceedings of the 3rd Int’l Conf. on Pervasive Computing and Communications Workshops (PerCom 2005 Workshops) 0-7695-2300-5/05 $20.00 © 2005 IEEE
of a set of readings. Therefore, given a sum range query over [si ..sj ], [ts ..te ], the answer of this query can be obtained by accessing the compressed data and summing two contributions. The first one is given by the sum of those values which represent ranges completely contained inside the range of the query (i.e. ranges of the form < [si ..sj ], [ts ..te ] >, where si ≤ si ≤ sj ≤ sj and ts ≤ ts ≤ te ≤ te ). The second one is given by values corresponding to ranges which partially overlap the range of the query (i.e. ranges of the form < [si ..sj ], [ts ..te ] > such that [si ..sj ] intersects [si ..sj ] and [ts ..te ] intersects [ts ..te ]). The latter contribution can be evaluated by performing linear interpolation, i.e. assuming that the data distribution inside ranges described by aggregate values is “homogeneous” (i.e. we use Continuous Values Assumption - CVA). Obviously, this estimation technique introduces some approximation, unless summarized data are actually homogeneous.
3
A Distributed Framework for Storing and Querying Sensor Network Data
In this section we describe our distributed architecture for storing sensor network data and supporting fast approximate (aggregate) query answering. The architecture is sketched in Fig. 2, where the following entities are represented: - Sensor: is the basic data source for our system; - Sensor Domain: is a collection of sensors which are semantically related to one another (for instance, temperature sensors of the same region); - Stream Source: is the host collecting the readings coming from a certain sensor domain; - Data Dispatcher: is a service which receives sensor readings from Stream Sources and forward them to the Sensor Stream Databases which are “interested” in these readings; - Sensor Stream Database (SSD): is a database which stores readings coming from a certain set of sensor domains (for instance, SSD1 stores the readings of Sensor Domain 1, SSD2 stores the readings of Sensor Domains 2 and 3, and so on). Furthermore, it contains compressed information (namely, Snapshot) on data generated by sensors belonging to different sensor domains; - SSD Locator: is a service which accesses the catalog of SSDs, where the associations between SSDs and Sensor Domains are stored; - Query Dispatcher: is a service which processes range queries by locating the SSDs containing the data involved in the queries, and selecting the SSDs to be accessed in
order to evaluate the queries effectively (i.e. with a reasonable accuracy) and efficiently; - Snapshot Catalog: is a service which accesses the catalog of snapshots; this catalog stores, for each SSD, information about the snapshots which are available in it; - Snapshot Agent: is an agent executing the snapshot policy, by updating (periodically) compressed data stored in SSDs (further details are given in the following). Briefly, our system works as follows. Sensor readings are stored into the Sensor Stream Databases. Each SSD is associated to a set of Sensor Domains, and contains both a uncompressed and a compressed version of data coming from the related Sensor Domains. Data are compressed using the technique presented in [3], which has been briefly summarized in Section 2.2. As previously explained, compressed data can be used when answers having a dramatic accuracy are not needed, and fast answers are preferred. In more detail, sensor readings coming from a given Sensor Domain SD are collected by the Stream Source SS associated to SD. Then, SS invokes a Data Dispatcher DD which forwards incoming readings to the right SSDs (i.e. the SSDs storing all data coming from SD). In particular, DD locates the SSDs by invoking the SSD Locator. As an SSD receives new information, it updates both the detailed and the compressed representation of its data. Each SSD can also contain compressed information on sensor readings coming from Sensor Domains other than the ones associated to it. That is, information stored in an SSD is possibly replicated in other SSDs, but with a different degree of accuracy (i.e. different compression rate). In the following sections we will explain in more detail how compressed data are updated (Section 3.1) and how query execution plans are scheduled (Section 3.2).
3.1
Updating Compressed Data
Consider two Sensor Stream Databases SSD1 and SSD2 , such that SSD1 contains a compressed representation of the data stored in SSD2 . The Snapshot Agent periodically accesses SSD2 , extracts a compressed version of its up-to-date data, and sends it to SSD1 , which replaces its old compressed version of SSD2 data with the new one. The Snapshot Agent accesses the Snapshot Catalog to check which pairs < SSDa , SSDb > have a Snapshot Contract. Basically a Snapshot Contract between SSDa and SSDb says that SSDa is authorized to extract compressed information from SSDb , and contains parameters such as the frequency of updates (how often can SSDa access SSDb to retrieve its data?) and the compression ratio (how much “accurate” is the snapshot to be extracted from SSDb required by SSDa ?).
Proceedings of the 3rd Int’l Conf. on Pervasive Computing and Communications Workshops (PerCom 2005 Workshops) 0-7695-2300-5/05 $20.00 © 2005 IEEE
moreover, the relative error of the query answer decreases as the query size increases. Therefore, in order to enable the accuracy to be easily and effectively estimated, for each SSDi and each compressed data stream stored into SSDi a statistical report is published, where average relative errors for samples of queries of different size are summarized. In this report (namely Estimated Error Report) average relative errors are grouped by the query size, so that the error for a query qi on a given compressed data stream evaluated on SSDi can be estimated by accessing this synopsis and looking for the average relative error for queries having the size closest to qi issued on the same compressed data as qi . If we denote the estimated relative error for qi as erri , the answer of Q as ans(Q), the size of qi and Q as size(qi ) and size(Q), respectively, then the estimated absolute eri) ror of qi can be written as erri · size(q size(Q) · ans(Q) (where size(qi ) size(Q)
Figure 2. System Architecture
3.2
Query execution plan
Range queries issued by clients are processed by the Query Dispatcher, which accesses both the SSD Locator and the Snapshot Catalog to locate the SSDs which contain either uncompressed or compressed data involved in the query. Then, the Query Dispatcher selects the most “convenient” SSDs to be accessed in order to evaluate the query. The choice of the most convenient SSDs is performed by using a cost model, where several parameters are considered, such as the degree of accuracy required by the client, the degree of accuracy of the compressed data available in different nodes, the usage percentage of each SSD, being all these parameters available in the SSD Locator and in the Snapshot Catalog. A range query is evaluated by partitioning it into a set of sub-queries, and then evaluating these sub-queries by accessing different SSDs. The aim is to parallelize the query answering process as much as possible by constraining the estimated error to be lower than the accuracy threshold set by the client and by not accessing SSDs whose usage percentage is too large. The estimated error for a query Q partitioned into the set of sub-queries {q1 , . . . , qn } is obtained by summing the estimated errors of each sub-query qi . In particular, the estimated error of qi depends on the SSD where it is evaluated, as different SSDs can represent the same portion of the data stream using different compression ratios. However, it is not possible to estimate the accuracy only on the basis of the knowledge of the compression ratio, as the accuracy of the answer of a range query evaluated on the compressed representation of a data stream depends on several parameters. As shown in [3], the more “homogeneous” the original data distribution, the more accurate the query answer;
· ans(Q) is an estimate of the answer of qi ). In order to select an effective partitioning of a query Q into sub-queries, the following steps are performed: 1. All SSDs containing portions of data involved in the query are located (by accessing the Snapshot Catalog); we denote the number of these SSDs as nSSD and, for each SSDi , denote as sqi the maximum sub-range of Q whose data are available in SSDi ; 2. A grid is constructed on the range of Q by prolonging the edges of each sqi obtained at the previous step up to Q boundaries; then, for each sqi , the sub-queries sqij of sqi whose edges lie onto this grid are considered. For each i ∈ [1..nSSD ], we denote the number of these sqij as nsq (i). In Fig.3 an example of how such a grid is constructed and how sub-queries sqi1 , . . . , sqins q for each sqi are defined is given (for the sake of simplicity, the example considers only two sub-queries); 3. for each sqi , the estimated errors of all sub-queries sqij (with j ∈ [1..nsq (i)) generated at the previous step are evaluated by accessing the Estimated Error Report; these estimates will be denoted as erri1 , . . . , errik ; 4. the partitions of Q with the following properties are considered: a. each sub-query in the partition belongs to the set of sub-queries obtained at the previous step; b. the estimated error (obtained by summing the estimated errors of each sub-query in the partition) is less than the threshold errM AX ; c. at most K sub-queries are evaluated on each SSDi involved in the query; d. SSDs with a large usage percentage (greater than percM AX ) are not used. 5. Among all partitions described at the previous step, the one is selected which maximizes the number of SSDs accessed.
Proceedings of the 3rd Int’l Conf. on Pervasive Computing and Communications Workshops (PerCom 2005 Workshops) 0-7695-2300-5/05 $20.00 © 2005 IEEE
Our problem is an instance of constrained Set Partitioning problem [1], and its formulation is the following: max i ∈ [1..nSSD ] xij j ∈ [1..nsq (i)] (1) A x = 1 i) MAX errij · size(q (2) i ∈ [1..nSSD ] size(Q) · xij ≤ err j ∈ [1..nsq (i)]
(3) ∀i ∈ [1..nSSD ] j∈[1..nsq (i)] xij ≤ K (4) ∀i ∈ [1..nSSD ], j ∈ [1..nsq (i)] percij · xij ≤ percMAX ∀i ∈ [1..nSSD ], j ∈ [1..nsq (i)] xij ∈ {0, 1} Therein: 1) xij is a boolean variable which indicates whether the sub-query sqij belongs to the partition; 2) x is the vector of all xij ; 3) A is the incidence matrix, which defines all possible sub-queries to be considered in defining the query execution plan. Due to space reasons, we do not provide a formal description of A. Details on how the incidence matrix in a Set Partitioning problem is defined can be found in [1]. 4) errij is the estimated relative error of sqij ; 5) percM AX is the maximum usage percentage which is tolerated. We point out that constraint (2) implicitly bounds the number of SSDs to be accessed, as it limits the number of “small” sub-queries selected in the partition of Q. In fact, as shown in [3], the approximation error gets larger as query size decreases, so that partitioning Q into many small queries should result in obtaining a large overall error. On the other hand, constraint (3) avoids overloading servers, and constraint (4) avoids overloaded servers to be accessed to evaluate pieces of the query. Therefore, even if the objective function maximizes the number of SSDs to be accessed, constraints (2), (3), (4) makes it unlikely that a congestion occurs, where either too many servers are involved in the query evaluation or some servers are overloaded. We point out that the cost model above disregards the bandwidth availability, as we assume that messages exchanged for issuing a query on a server and receiving the query answer have very small size. However, the formalization of the model can be easily extended by taking into account this issue, as well as other aspects, such as considering servers with different computational power.
4
Future work
The system described in the paper is under construction: the current version is based on a Grid architecture [5], as
Figure 3. Partitioning a query several of its components (such as catalogs and locators) have a natural implementation in this setting, and computational power of grids are particularly suitable in our context. Preliminary experiments show that our system provides fast answers (faster than the ones obtained by issuing queries only on uncompressed data) and guarantees the desired accuracy degree. Further efforts will be devoted to defining ad-hoc techniques to solve the optimization problem underlying the evaluation of the query execution plan.
References [1] E. Balas, and M. Padberg, “Set partitioning: a survey”, SIAM Review Vol.18, pp. 710-760, 1976. [2] P. Bonnet, J.E. Gehrke, and P. Seshadri, “Towards Sensor Database Systems”, in Proc. of the 2nd Int. Conf. on Mobile Data Management, pp. 3-14, Hong Kong, China, 2001. [3] A. Cuzzocrea, F. Furfaro, E. Masciari, D. Sacc`a, and C. Sirangelo, “Approximate Query Answering on Sensor Network Data Streams”, in A. Stefanidis, S. Nittel (eds.), SensorBased Distribution Geocomputing, Taylor and Francis Publishers, 2004. [4] S. Guha, N. Koudas. and K. Shim, “Data-streams and histograms”, ACM Symposium on Theory of Computing, pp. 471-475, 2001. [5] I. Foster, C. Kesselman, and S. Tuecke, “The Anatomy of the Grid: Enabling Scalable Virtual Organizations”, in Int. Jou. of High Performance Computing Applications, Vol. 15 (3), pp. 200-222, 2001. [6] J.M. Hellerstein, P.J. Haas, and H.J. Wang, “Online Aggregation”, in Proc. of the 1997 ACM Int. Conf. on Management of Data, pp. 171-182, Tucson, AZ, USA, 1997. [7] L. Schlesinger, and W. Lehner, “Querying Asynchronously Updated Sensor Data Sets under Quantified Constraints”, in A. Stefanidis, S. Nittel (eds.), Sensor-Based Distribution Geocomputing, Taylor and Francis Publishers, 2004. [8] Y. Yao, and J.E. Gehrke, “Query Processing in Sensor Networks”, in Proc. of the 1st Bie. Conf. on Innovative Data Systems Research, Asilomar, CA, USA, 2003.
Proceedings of the 3rd Int’l Conf. on Pervasive Computing and Communications Workshops (PerCom 2005 Workshops) 0-7695-2300-5/05 $20.00 © 2005 IEEE