Applying workflow as a service paradigm to application farming

June 4, 2017 | Autor: Reginald Cushing | Categoria: Distributed Computing, Computer Software
Share Embed


Descrição do Produto

CONCURRENCY AND COMPUTATION: PRACTICE AND EXPERIENCE Concurrency Computat.: Pract. Exper. (2013) Published online in Wiley Online Library (wileyonlinelibrary.com). DOI: 10.1002/cpe.3073

SPECIAL ISSUE PAPER

Applying workflow as a service paradigm to application farming Reginald Cushing 1, * ,† , Spiros Koulouzis 1 , Adam Belloum 1 and Marian Bubak 1,2 1

Informatics Institute, University of Amsterdam, 1098 XH Amsterdam, The Netherlands 2 Department of Computer Science, AGH Krakow, PL-30059 Krakow, Poland

SUMMARY Task farming is often used to enable parameter sweep for exploration of large sets of initial conditions for large scale complex simulations. Such applications occur very often in life sciences. Available solutions enable to perform parameter sweep by creating multiple job submissions with different parameters. This paper presents an approach to farm workflows, employing service oriented paradigms using the WS-VLAM workflow manager, which provides ways to create, control, and monitor workflow applications and their components. We present two service-oriented approaches for workflow farming: task level, whereby task harness acts as services by being invoked on which task to load, and data level, where the actual task is invoked as a service with different chunks of data to process. An experimental evaluation of the presented solution is performed with a biomedical application for which 3000 simulations were required to perform a Monte Carlo study. Copyright © 2013 John Wiley & Sons, Ltd. Received 15 August 2012; Revised 17 December 2012; Accepted 18 May 2013 KEY WORDS:

scientific workflows; farming tasks; distributed workloads; service orientation; cloud resources; parametric studies

1. INTRODUCTION Many simulation-based scientific workflows involve setting appropriate initial parameters that are often determined from a large number of smaller scale test runs and automatically staged to appropriate resources [1]. The task-farming paradigm is a technique well suited to distributed architectures such as grid or clouds where multiple heterogeneous resources can be used for the concurrent task execution. Task farming is applicable to embarrassingly parallel applications where non-communicating tasks can be executed efficiently across geographically distributed computing resources. The basic operations are spawning tasks, finding appropriate computing and storage resources, and assigning work to each task. One approach to task farming is the master–worker paradigm, where a master process spawns multiple worker tasks or processes; once these initial tasks are executed, the master process creates the next tasks until the task bundle is finished. A number of potential applications follow this paradigm, such as Parameter Sweep Applications (PSAs) [1], where a potentially huge parameter space is divided into regions, and worker tasks search the specific region for optimal values. A subclass of such applications is data independent parameter sweeps where the input data set is shared among workers. Farming tasks deals with splitting data or parameter space n-wise among n identical tasks. This technique speeds up data processing, especially when dealing with independent tasks. Farming can make better use of the resource by elastically replicating tasks to reduce empty resource slots while *Correspondence to: Reginald Cushing, Informatics Institute, University of Amsterdam, Science Park 904, 1098 XH Amsterdam, The Netherlands. † E-mail: [email protected] Copyright © 2013 John Wiley & Sons, Ltd.

R. CUSHING ET AL.

reducing the workflow makespan. If we consider data D to take time T1 to process on one node and Tn on n nodes when dividing D among the n nodes, the ideal speedup is n and is defined as T1 . A close to ideal speedup can be achieved when assuming independent tasks with negligible Tn communication overhead. Farming allows the execution of large number of experiments, each of which may have a different input data set or a different parameter set. A number of the preparation steps involved in these experiments can be automated: staging in and out the appropriate input and result data sets, finding available and appropriate computing and storage resources, and so on. Another important aspect is the reproducibility of results. When a scientist has performed thousands of simulations and a few of them fail, he is interested to know which ones failed and the context in which they have been executed (computing and storage resources, libraries, input data sets, parameter values, and so on). This requires appropriate monitoring and provenance systems. This paper describes a new approach to support the farming of a large number of scientific applications using Workflow as a Service (WFaaS) paradigm. We present two service-oriented paradigms for workflow farming: task level, whereby task harness acts as services by being invoked on which task to load, and data level, where the actual task is invoked as a service with different chunks of data to process; we provide solutions to the automation of staging files, selecting suitable resources, monitoring and interacting with running experiments, and tracking of provenance to enable reproduction of results. The paper is structured as follows: Section 2 presents the related work; Section 3 describes the framework we have developed to support farming; Section 4 introduces the concept of WFaaS, which improves the farming performance; in Section 5, we describe how the architecture is used with cloud resources; in Section 6, we present workflow applications developed and running with the framework described in this paper, and we discuss a number of issues that emerged when running these applications. Concluding remarks are outlined in Section 7. 2. RELATED WORK A number of tools, such as Nimrod/G [2], NetSolve [3], Ninf [4], AppLeS [5], have been developed, and they offered task-farming facilities while hiding the low-system level details. The way these tools expose the task farming facilities vary from one tool to another: Nimrod, NetSolve, and Ninf, are equipped with simple APIs, whereas AppLeS has a complete programming environment. Nimrod/G and GridSolve [2, 3] provide task farming capabilities in a grid environment. Nimord/G [2] is a system that aims at scheduling parameter sweep studies on grid architectures (Globus). Nimrod provides a declarative language for describing parametrized experiments. The core part of the architecture is a parameter engine that is responsible for parameterizing the experiment, creating jobs and mapping tasks to the resources through a schedule advisor. The scheduling approach in Nimrod/G is based on grid economy with deadlines. This tries to achieve trade-offs between performance and cost [6]. The GridSolve system works in a similar fashion, exploiting an agent to maintain details about available servers and then selecting resource on behalf of the user. NetSolve has several specialized execution mechanisms that support common computing models. Another similar application is AppLeS [5], which focuses on the scheduling problem and provides various solutions such as self-scheduled work queue and adaptive scheduling with heuristics. Other parameter sweep efforts focus on extending known systems such as Kelper [7] to include parameter sweep capabilities. One such attempt proposes a master–worker architecture for the Kelper system [8]. The architecture targets networked computing resources. A master node initiates the workflow execution that manages a swarm of worker nodes to execute sub-workflows. Through this architecture, worker nodes are able to run concurrently and process different data. The BOINC [9] is a task farming, CPU scavenging framework that has been popularized by SETI@home. BOINC architecture is centralized, and clients log into servers asking for work. The BOINC system harnesses a wider distributed system through volunteer computing, whereby any user on the Internet can take part in the system by donating computation and storage to be used by BOINC. Applications running on the BOINC system are largely independent and hence can scale quite well on such architectures. Copyright © 2013 John Wiley & Sons, Ltd.

Concurrency Computat.: Pract. Exper. (2013) DOI: 10.1002/cpe

APPLYING WORKFLOW AS A SERVICE PARADIGM TO APPLICATION FARMING

Parameter sweeps are a special kind of task farming with the difference that the data being split amongst the task pool is the set of parameters. PSAs are characterized by an embarrassingly parallel application, which is an application that can be decomposed into many independent tasks with little or no synchronization or data dependencies. Parameter sweep model is a simple yet powerful concept used by many scientific application such as those in computational fluid dynamics, bioinformatics, particle physics, discrete event simulation, and computer graphics [6]. In PSA, data is replicated to all collaborating tasks while each task is given a set of different parameters, where each task in PSA works on identical data. Because PSAs are intrinsically independent, they can tolerate network latency and therefore scale to large distributed architecture. Additionally, they are amenable to simple fault tolerance mechanisms such as retries [6]. Most of these systems are developed on top of lower-level tools such as Globus, Legion, or Condor. The types of jobs submitted by using these systems are common scripts and executables that do not have an intrinsic ability to act as a service, if not specifically programmed by the application programmer. In our proposed method, a part of the runtime executes alongside the job (referred to as task harness), which enables the task to act as a service. An approach to WFaaS is described in [10]. This approach entails wrapping a Taverna workflow into a single web service by using gRAVI service wrapping tool with the aim of enhancing reusability and accessibility of workflows. This notion of WFaaS differs from our proposed method, whereby our aim is rather on better scheduling and enactment of the workflow rather than packaging a workflow into a single web service. Paper [11] describes how cloud resources are used for workflow execution with the aid of web services (e.g., resource provisioning). This also differs from our approach although in our solution, the workflow itself and the executing jobs are acting as services. 3. WS-VLAM WORKFLOW MANAGMENT SYSTEM The WS-VLAM is a follow-up of a grid-based workflow system (VLAM) that aimed at covering the entire life cycle of scientific workflows [12]. VLAM had two levels of abstraction, one covering the design phase of a scientific workflow and another taking care of the execution phase. VLAM end-users were able to share workflows, reuse each workflow components, and execute workflow on resources across multiple organizations. 3.1. Model of computation The WS-VLAM workflows are represented as Directed Acyclic Graphs (DAGs). Vertices in the graphs represent computation as tasks, whereas edges represent data communication and dependency. Each task has a number of typed input and output ports. These ports represent data channels between tasks in the workflow. The links between data channels represent data dependencies. This allows the enactment engine to model the workflow as a dataflow graph. We model a workflow W as a set of interdependent dataflow tasks ft1 , t2 , ..., tn g that are matched to the set of resources R. Tasks are represented as tuples < id , st , IP , OP , P T , DT , IC , OC >, where id is the task id, st is the allocated computing slot time for a given task, IP is the set of input ports, OP is the set of output ports, P T is the set of tasks that precede task tid where P T  W , DT is the set of dependent tasks that follow task tid where DT  W , and IC is the set of input data channels between output ports of tasks in set P T to input ports for task tid . Similarly, OC is the set of output data channels between output ports for task tid to input ports of tasks in DT . Ports consume and produce a set of messages fm1 , m2 , ..., mn g; messages are consumed and produced sequentially. The dataflow model dictates that a task tk will only be matched to a resource in R when, for each input port IPtk , the first message m1 is delivered. This means that only those tasks that actually have work to do will be scheduled to a computing node. The advantage of this approach over co-allocating the whole workflow resources at the beginning are twofold: tasks do not idle while waiting for data to start processing because data is available before the start of execution,

Copyright © 2013 John Wiley & Sons, Ltd.

Concurrency Computat.: Pract. Exper. (2013) DOI: 10.1002/cpe

R. CUSHING ET AL.

Figure 1. Workflow management system architecture. The central component is a message exchange server that allows task communication and workflow coordination. Most components are decoupled and communicate through the message exchange. The components of the architecture are split into two main regions: the server side where most of the coordination takes place and the resource/worker-node side where the harnesses are running.

and co-allocations are known to degrade the system because of increased task waiting times [13,14]; thus, removing the need to co-allocate resources eases the scheduling complexity on the shared resource. 3.2. Core modules and architecture The core components of the WS-VLAM architecture are illustrated in Figure 1. The central component is the message exchange server that acts as the means of communication between workflow tasks, coordination, and provenance capture. Workflow scheduling is achieved in two steps: a toplevel scheduler, Enactment Engine, which is responsible for orchestrating the workflow, and a bottom-level scheduler, Resource Submission Scheduler, which is responsible for scheduling tasks onto resources through matchmaking. The bottom level scheduler can manage multiple resources such a grids, clouds, and clusters. This is carried out through Submitters that can talk directly to the underlying resources and submit jobs. For example, a cluster submitter would interface to a queue manager such as Condor, whereas a cloud submitter would interface to a cloud manager such as OpenNebula and initiate virtual machines. The bottom-level scheduler will direct jobs to different submitters depending on the scheduling policy. A scheduling policy would dictate how jobs are distributed to submitters; one could use cloud resources to aid in execution of large workflows whereby the bottom-level scheduler first tries to fill up traditional distributed resources and then overflow jobs to virtual resources. Farming and scaling capabilities are achieved through a dedicated entity that sits on top the message queue. Because workflow tasks communicate over the message exchange, the latter can be used to inspect the state of the workflow and deduce which tasks are overloaded by looking at their respective data queue sizes. Tasks are able to write provenance information to dedicated queues that are then consumed by a Provenance Extractor. The system collects two types of provenance data: system-level provenance and application-level provenance. The unit of submission is a Python harness. This harness can be considered as a smart late-binding mechanism with communication and provenance capture capabilities. The harness implements a Copyright © 2013 John Wiley & Sons, Ltd.

Concurrency Computat.: Pract. Exper. (2013) DOI: 10.1002/cpe

APPLYING WORKFLOW AS A SERVICE PARADIGM TO APPLICATION FARMING

plug-in system where workflow tasks can be plugged in dynamically into any harness running on any resource. Furthermore, each harness can run multiple tasks thus optimizing resource usage. Communication between workflow tasks is also abstracted, which alleviates the programmer from the underlying communication intricacies. The programmer merely reads and writes to a port while the harness takes care of the rest. By means of the harness, the system can implement the WFaaS paradigm on two levels, the data level and the task level. These two levels of WFaaS are discussed in detail in Section 4.

3.3. Provenance Subsystem Provenance helps to track back the execution of the workflow and provides information that can help either in reproducing successful workflow execution or discovering problems that led to faulty execution. Provenance may also provide means to create links between publications and data sets, allowing to repeat published experiments; it helps in managing data-lineage and solving the questions of trust and reputation. In case of workflow applications, provenance data has to be collected at each phase of the workflow life cycle starting from workflow design, going through the prototyping and calibration phases and ending by the execution and result analysis. System level provenance keeps track of a context in which a simulation experiment has been performed: when the workflow was executed, on which machines, which libraries and data have been used, been produced and so on. This information is used for debugging by workflow developers. Users of the workflows can require this information for validation and reproducibility. Application level provenance concerns information that had a direct impact on a successful workflow execution or a faulty one. This kind of provenance allows the scientific programmer to record important events regarding the data processing such as at which iteration a simulation is converging. A new service has been added to the WS-VLAM basic service to handle the collection of provenance data and save it in a persistent storage. WS-VLAM makes extensive use of events (most of changes in the status of the components and services composing the WS-VLAM system trigger an event); hence, it was straight forward to add a new provenance service that subscribes to any event considered interesting from a workflow provenance point of view. The provenance events are then collected and structured according to the Open Provenance Model [15]. Provenance information is collected through the messaging system where dedicated message queues are used to collect provenance events. The first step in generating the provenance data is to convert the static workflow description into an OPM graph; this generates the skeleton of the workflow provenance with some static information known before the execution of the workflow. The generated OPM graph is populated with events collected at runtime. OPM artifacts will be added to the OPM graph on the corresponding nodes, and at the end of the workflow execution, the final OPM graph is saved in the provenance database [16] In parameter sweep analysis, thousands of workflows are farmed and executed independently. The runs are considered as part of one unique experiment. To achieve this, the concept of an account is introduced in the OPM specification. In other words, the OPM graph of a parameter sweep workflow will have as many accounts as the number of runs, and each account is a completed OPM graph representing one instance of the workflow execution. From system-level point of view, provenance data is needed mainly for debugging, accounting, and system performance analysis. It is likely that the provenance query will drive the following elements: workflow_name, user_name, date and time the workflow was submitted and finished, state of the workflow when it completed, the list of failed runs of a given parameters sweep experiments, the list of workflow components that failed in a specific workflow run, and the logs of the failed workflow components (stdout, stderr). From application point of view, provenance data is needed for the analysis of the results so as to trace back results to data sources and the methods of processing the data. It is likely that the provenance query will be driven by the application-specific elements such as parameter changes, some intermediate results, and input data sets. WS-VLAM workflow developers can use the WS-VLAM Copyright © 2013 John Wiley & Sons, Ltd.

Concurrency Computat.: Pract. Exper. (2013) DOI: 10.1002/cpe

R. CUSHING ET AL.

parameter interface to trigger any events relevant to the analysis of the workflow execution; any changes to these parameters will be recorded at runtime and added to the OPM graph. The collection of provenance data follows this scenario: when the end-user submits the workflow for execution, an initial OPM graph is created. When provenance events start firing, the provenance service updates the OPM graph. The final OPM graph is saved into a provenance database. A query interface allows the end-user to search throughout the provenance data. 3.4. Monitoring Scientists running complex workflow applications are interested in a gradual and intuitive presentation of the information about running tasks. In WS-VLAM, the monitoring is performed at two levels: the workflow level and workflow component level. At the workflow level, the end-user can follow the state of a workflow submission and check whether the workflow is pending, submitted, running, or completed. A workflow submission is considered as failed if one of its components has failed. The scientist can then decide to investigate the reasons for the failure by looking at the data and logs collected from the components that have been reported as having an abnormal state. Each workflow component publishes a number of events to which the WS-VLAM monitoring client can subscribe. In this way, the published information are collected and relayed to the end users. In WS-VLAM, the following events are published by each workflow component: (i) standard error event, (ii) standard output event, and (iii) graphical event. Notification events are gathered from various entities such as the resource submitters and the python harnesses. These are used for monitoring of workflow execution. By default, the WS-VLAM client application subscribes to the notification generated by the workflow engine, which gives information about overall workflow execution status. The workflow engine subscribes to notification provided by the workflow components to receive updates about their respective execution status. Events are collected from the resource allocation manager via callback mechanisms provided at the workflow component level. In this way, the WS-VLAM client application is able to collect the overall execution status and the default output and error streams for each workflow component. 4. WORKFLOW AS A SERVICE The concept of workflow as a service has been elaborated to increase the performance and minimize the overheads of workflow farming. In the initial scenario, a workflow is submitted to computational resources to process a particular set of data and input parameters; after the processing is finished and the results are collected, the workflow is gracefully terminated. When the next set of data and parameters is to be processed, the workflow is started again, which means that all the workflow components have to be rescheduled, most likely, on a different set of grid resources. In grid and other shared environments, these activities form a significant overhead due to queue waiting times for resource acquisition and staging-in overheads. One way to avoid such overheads is to keep the workflow running on the resources even after a particular data set has been processed. The next data set can be assigned to the workflow that is already instantiated; the parameters for the next execution can also be changed at runtime. This approach helps to save the time of executing the whole workflow each time. Such a concept of user-level preliminary allocation of resources has been employed for user-level scheduling and execution of multitude of short-running jobs on grid resources [17]. Because workflows are kept running, waiting to process new data or parameter sets, they behave as services, hence the Workflow-as-a-Service or WFaaS paradigm. In this way, for workflow farming, only a limited set of workflows or workflow tasks have to be executed and kept running on the resources, see Figure 2. Another approach is to reuse computing resources. In many workflows, some tasks are short lived, and others are long lived. This creates a situation where short lived tasks spend more time stuck in queues than actually spend time executing. A better solution would be to reuse the resource acquired by short-lived jobs to run other workflow tasks, thus reducing waiting times. Because harnesses are kept running, waiting to start new tasks, they also behave as services and adds to the WFaaS paradigm. Copyright © 2013 John Wiley & Sons, Ltd.

Concurrency Computat.: Pract. Exper. (2013) DOI: 10.1002/cpe

APPLYING WORKFLOW AS A SERVICE PARADIGM TO APPLICATION FARMING

Figure 2. Sequence of events following the WFaaS paradigm. The enactment engine activates a task that in turn is passed onto the submitter. Having already a harness running on some resource, the submitter invokes the harness to load the task. Once loaded, the methods within the task can start processing data. The task methods can be invoked over multiple data chunks, thus the WFaaS paradigm; furthermore, the void harness can be re-invoked with a new task, thus acting as a service at the task-level also.

The WFaaS paradigm is described at two levels – data level and task level. To better describe the distinction between these levels, we extend the definitions introduced in Section 3.1 by adding a new set, H , which is the set of harnesses (a.k.a. containers) hosting the set of tasks ft1 , t2 , ..., tn g pertaining to workflow W on the set of resources R. The traditional approach of mapping the set of tasks, W to the resources, R is carried out directly through a scheduler, shed W W ! R. In tasklevel WFaaS, we use an intermediate mapping such that a scheduler maps the set of harnesses to the resources, shed _harness W H ! R, and shed _t asks W ti ...j 7! hn maps an arbitrary number of tasks to a single harness instance, hn . This allows a harness to process multiple tasks, thus achieving task-level WFaaS. Furthermore, data-level WFaaS is achieved by invoking tasks with multiple data. In our module, the unit of data is a message; thus during the lifetime of a task, the set of processed data equates to the set of messages, M , consumed by the task. Similarly, to task-level WFaaS, shed _dat a W mi ...j 7! tn , an arbitrary number of messages can be mapped to a single task; thus, we do not need to create a separate task for every data message. 4.1. Task harnessing The WFaaS is enabled through the use of task harnessing. The Python task harness implements WFaaS on two levels – the data-level and the task-level. Data-level is achieved by invoking the same task over different data. This translates into steps (1) to (4) in Figure 3. Task-level serviceoriented characteristics are achieved by invoking different tasks on the same harness; thus, a harness acts as a container akin to web service containers. The latter is illustrated as step (5) in Figure 3. A task harness takes care of the runtime execution of a workflow task on a computing node; thus, by keeping the task alive and providing successive parameters, the task acts as a service. The harness architecture (Figure 3) is based on a plug-in model whereby the task and communication modules are dynamically loadable into a harness program. A simple task module is listed in Listing 1. The harness is responsible for executing the scientific task and abstract the task communication. Each input and output port on a task is bound to a message queue on the exchange server. The harness is responsible of subscribing to such queues and retrieve data. This data is then moved to the relevant ports on the task. Large data is not passed through the message exchange as this would overload the server. Therefore, data is referenced, whereas the Copyright © 2013 John Wiley & Sons, Ltd.

Concurrency Computat.: Pract. Exper. (2013) DOI: 10.1002/cpe

R. CUSHING ET AL.

Figure 3. A task harness is submitted to a computing node. The actual task is dynamically loaded into the harness. The harness handles the runtime execution of the task by keeping the task alive and realizing the WFaaS paradigm. In (1), the harness acquires data from the message queue that is bound to the input port of the task; (2) the data is moved to the task, and at this stage, the task performs the necessary processing on the data and writes the output to one of its output ports; (3) the harness then acquires the data and sends it to message exchange (4) to be consumed by other tasks in the workflow. Steps (1)–(4) repeat themselves until all data on the message queue is consumed. After terminating the task, the harness does not quit the resource but loads the next task in the job queue (5) and repeats steps (1)–(4) again.

actual data is resident on some dedicated storage such as webdav and gridftp. The harness is also responsible for downloading the actual data from the reference in the message. Large output data is automatically copied to a dedicated storage, and the reference to it is sent to message server. The output data server can be dynamically choosing from a set of configured servers by choosing the closest one. Listing 1 shows a typical workflow module; Line 1 names the task/module. This is the name given to the task in the workflow. Lines 2, 6, and 15 define three function that are implemented by the scientific programmer. Upon loading the module, the harness calls on_load() where initialization can take place. After on_load(), the harness calls run() on a separate thread. run() is the main routine where most scientific logic takes place. The harness offers easy ways how to read data; line 10 reads a data chunk from the tasks input port. After processing the data, output results can be simply written to the output port in line 13. The harness also allows for in-application provenance gathering whereby the programmer can write events, line 12, which are collected by the system. When no more data is available on the input ports, on_unload() is called, which is intended to run clean-up routines. The harness will then proceed to unload the module and load a new one. The lower example in Listing 1 shows the same module implemented by using callback functions to process data. Line 22 registers the callback function for the input port. For every data chunk (parameter body in Line 28), the callback data_processor() at Line 28 is called to process the data. Tasks participating in WFaaS execution communicate with each other through messaging. Each task in the workflow polls a message server for new inputs, be it parameters or data. This polling mechanism circumvents common network restriction on computing nodes that tend to block listening ports. With many polling tasks, a server can easily be inundated with polling requests, which is a common problem known as a thundering herd problem. This occurs when many tasks decide to poll the server at the same time. To limit this problem, we implemented an exponential back-off where tasks exponentially increase their own polling interval when no new messages are retrieved. Each port on a task is associated to a message queue on the message server. Task communication is achieved by routing output messages from one task to the input message queues of the next. Tasks act as services by continuously consuming new parameters and terminate once all input queues are exhausted of data or the allotted time on the computing node has expired. Replicas of the same task are attached to the same queues; hence, data is automatically partitioned among instances of the same task. This system of message queues depicted in Figure 4 allows tasks to scale so that each task processes a subset of the parameter space instead of just processing one parameter and immediately terminate. Data scheduling is also carried out through the message queues attached to the ports on the messaging system. Data is partitioned through shared queues where multiple tasks access the same queue and in turn retrieve data messages. Data that is not meant for partitioning is fanned out to all replicated task input ports; thus, each task has a copy or reference to the same data. Copyright © 2013 John Wiley & Sons, Ltd.

Concurrency Computat.: Pract. Exper. (2013) DOI: 10.1002/cpe

APPLYING WORKFLOW AS A SERVICE PARADIGM TO APPLICATION FARMING

Figure 4. Tasks participating in a parametric sweep study. Each replica tasks reads parameters as data from their input ports that are bound to message queues. Because the input message queue is shared between all replicas, parameters and data are automatically partitioned among a farm of tasks. Copyright © 2013 John Wiley & Sons, Ltd.

Concurrency Computat.: Pract. Exper. (2013) DOI: 10.1002/cpe

R. CUSHING ET AL.

Figure 5. Parameter to task mapping. Left, traditional mapping where each parameter is mapped to a single task. Center, the whole parameter space is mapped to a single task instance. Here, the task is working as a service, but this solution may not be optimal for large-parameter spaces. Right, subsets of the parameter space are assigned to replicas of the different task. Each task is working as a service processing part of the parameter space.

4.2. Task scaling Having a single workflow instance, processing all the parameter space might not always be optimal as shown in Figure 5, especially when resources are under utilized; thus, the workflow management system has to leverage the number of workflows for better resource utilization by taking into consideration the size of parameter space and the available resources. This is carried out through two main scaling strategies – fixed-scaling and auto-scaling. In fixed-scaling, the WS-VLAM initiates a fixed number of workflow instances, wheras with auto-scaling, WS-VLAM automatically replicates the workflow by looking at the parameter space and the time taken for each parameter to be processed. Auto-scaling is based on estimating the predicted task execution time. Through queue monitoring, the system can deduce the data processing rate of a given task. This is calculated by using the Message Interval Times (MIT), which is the time between each message being consumed by a task. MIT is an estimation of how long a particular data chunk took to process. By using the mean MIT and the size of the queued data waiting to be processed, we found that the system can estimate the execution time needed to process this data. The system can then decide to start replicating specific tasks within the workflow to reduce the execution time within a certain threshold. This method of prediction-based auto scaling is described fully in [18]

5. USING CLOUD RESOURCES FOR SCIENTIFIC WORKFLOWS The nature of WS-VLAM architecture allows easy integration with cloud resources. From Figure 1, we recall that resources are accessible through Submitters, and a Resource Scheduler is responsible for distributing jobs onto the submitters. Using cloud resources is therefore possible by having the adequate submitters in place that can initiate virtual machines and submit resources to the virtual machines. Furthermore, the method of task harnessing simplifies the job submission onto virtual machines. The traditional way to accomplish this is to setup a cluster queue manager such as Condor on a virtual cluster. Setting up such queue managers can be problematic especially when dealing with firewall restrictions. The system of task harnessing does not need any queue manager for job submission because the harness itself grabs jobs from a shared queue. The pull model mechanism also means that the harness can work around many firewall restrictions. This is also true for task-to-task communication, as communication can be proxied through the messaging servers; thus harnesses can reside anywhere where an outbound Internet connection is available. These mechanisms provide an easy and robust way to exploit cloud resources. Copyright © 2013 John Wiley & Sons, Ltd.

Concurrency Computat.: Pract. Exper. (2013) DOI: 10.1002/cpe

APPLYING WORKFLOW AS A SERVICE PARADIGM TO APPLICATION FARMING

Figure 6. Using cloud resources. For each cloud provider, a Submitter must be available. The Resource Scheduler distributes jobs onto the submitters that in turn talk to the cloud providers. The cloud submitters have two main responsibilities – starting up virtual machines and submitting jobs. Each virtual machine runs a void Python harness that is responsible of grabbing jobs. The harnesses on each virtual machine talk to each other by using a message exchange.

Figure 6 depicts the typical setup when using cloud resources. The Resource Scheduler picks up jobs from the workflow Enactment Engine. Having a Submitter for each cloud provider, the scheduler can distribute jobs to the different providers. Because submitters for traditional resources such as grids can also coexist with cloud submitters, the scheduler can mix and match job placement. Once a job is submitted through a cloud submitter, the latter can either fire up a new virtual machine using the providers API or else submit the job to a void harness already running on an active virtual machine. Virtual machines are primed with the harness; thus, the Python harness is always running on a virtual machine and continuously consume workflow tasks. In our experimental setups with cloud resources, we defined a scheduling policy that uses the concept of budgets; each cloud submitter manages a budget assigned to it. The intention of budgets is to use public cloud resource within a certain monetary budget. Each cloud submitter also maintains a reserve budget that is used to gracefully terminate a virtual machine when the real budget has run out. Without such a reserve budget, virtual machines are abruptly terminated and useful work lost. 6. AN EXEMPLARY BIOMEDICAL WORKFLOW APPLICATION The new approach described in the previous sections was applied to a biomedical study for which 3000 runs were required to perform a global sensitivity analysis of a blood pressure wave propagation in arteries (Figure 7). Patient-specific simulations involves many parameters based on data measured in vivo and subject to uncertainties [19]. The relationship between the model parameters and the simulated output is complex. Thus, a global sensitivity analysis is an appropriate method to investigate how the uncertainties in the model output can be attributed to the different sources of uncertainty in the model input. A patient-specific model was set up for the major arteries of the arm. Copyright © 2013 John Wiley & Sons, Ltd.

Concurrency Computat.: Pract. Exper. (2013) DOI: 10.1002/cpe

R. CUSHING ET AL.

Figure 7. Screenshot of the scientists’ desktop. On the top right corner, the scientist can compose the workflow in an intuitive way. The user can also specify how he/she wants to farm his workflow (list of input files and a range of application parameter). When the user executes the workflow, a monitoring window (window on the top left corner) shows the farmed workflows, and the user can monitor each run separately (cascade of windows on left bottom corner).

In a Monte Carlo study, 11 model parameters (e.g., Young’s modulus, vessel diameter, and artery length) [20] were varied randomly within their respective uncertainty ranges over 3000 model runs. Because input uncertainties of wave propagation models can not be neglected, further development toward an automatic evaluation of patient-specific parameters will be based on techniques that require even larger number of model runs such as probabilistic inversion methods [21]. It would then be interesting to further develop this application to provide more information about the simulation processes to the user (e.g., the durations of each simulation, failures, and used resources). The primary goal of running the use case was to speed up the entire sets of farmed workflow; this speed up is limited by Virtual Organization (VO) membership of the users: the more this VO membership gives access to computing resource, the more the system will be able to farm concurrent jobs and the faster is the execution of the entire experiment. In the current setting, the workflows are farmed by groups of fixed size; if more resources become available, new groups can be farmed. The preliminary results were obtained on the Dutch ASCII supercomputer (http://www.cs.vu.nl/das3/). Figure 8 shows the execution and waiting times obtained by using two scheduling approaches: WFaaS (left) and the original WS-VLAM farming (right). Each workflow submitted by using WFaaS approach performs multiple simulations; any computing resources that become available is added to the pool of resources to process the remaining simulations. In total, 28 workflows performed all the 100 targeted simulation. Although in the original WS-VLAM farming, each submitted workflow performs only one simulation, which leads to 100 separate submissions. It is clear that when computing resources are limited and multiple applications are competing to gain access to these resources, the WFaaS approach has a significant advantage as it reduces the waiting time considerably, leading to the overall speed up of the entire set of farmed workflows. The Copyright © 2013 John Wiley & Sons, Ltd.

Concurrency Computat.: Pract. Exper. (2013) DOI: 10.1002/cpe

APPLYING WORKFLOW AS A SERVICE PARADIGM TO APPLICATION FARMING Performance on DAS3 - 30 Jobs - 100 Parameter Sweep

Performance on DAS3 - 30 Jobs - 100 Parameter Sweep 06:00

04:00:00

Pending time Running time

Pending time Running time

03:30:00

05:00

03:00:00 04:00

Time

Time

02:30:00 02:00:00 01:30:00

03:00

02:00

01:00:00 01:00

00:30:00 00:00:00

00:00 0

5

10

15

20

Job#

25

30

0

20

40

60

80

100

Job#

Figure 8. Performance of WFaaS. On the left, 100 simulations of the wave workflow takes about 3 h and 15 min using the WFaaS; on the right, the same number of simulations take 5 h and 15 min farming each workflow separately. Each workflow submitted by following the WFaaS approach performs multiple simulations that reduce considerably the waiting time. In both cases, workflows are competing to use 28 computing nodes. For the WFaaS example, 30 tasks were submitted; thus the last two tasks are stuck on waiting queues. Once the slots are freed, they terminate immediately because the other tasks performed the work by using WFaaS.

Figure 9. A skeletal workflow intended to simulate blood flow for treatment of thrombosis. This is an extension of the first experiment and is intended to run patient-specific blood flow simulation by using many input parameter variations [22].

overhead of the WS-VLAM in terms of data movement among the workflow component is low and has been discussed in [12]. A second example (Figure 9) aims at demonstrating the WFaaS at the task-level. This experiment is an extension of the previous example where the core module SA_analysis contains the logic of the previous example. In this experiment, the tasks are given short workloads to simulate short-lived tasks. This workflow is executed on the BigGrid (Dutch e-science grid). With short lived tasks, resources are occupied for a relatively small time span making the queue waiting more apparent, which cause noticeable overhead. The experiment was run on the BigGrid (http://www.biggrid.nl/), which is the Dutch grid initiative. With task-level WFaaS, we can instruct a harness to load a sequence of tasks instead of just loading one task and terminate immediately. This mechanism makes most use of the resource allocated to the workflow. The results in Figure 10 show the difference of the workflow run with task-level service-oriented characteristics (right) and without (left). On the left shows the typical scenario with grid queue waiting times; queue waiting times are unpredictable and can vary from seconds to hours. Queue waiting times in shared grid resources play a crucial role in overall performance of workflow execution because they can span from minutes to hours [23]. This tend to cause problems in data intensive workflow execution because data produced by a task, for example runMC, needs to be buffered until the consuming tasks (SA_analysis) come into play. The data being produced might be too large to be buffered; thus, the faster the consumers start executing and the better the system can handle the load. This is achieved through task-level WFaaS (Figure 10 (right)) where acquired resources are reused to execute multiple tasks in sequence. The first task runMC has the highest queue waiting time because it has to pass the submission queues. Other tasks can be loaded immediately into already existing harnesses without waiting on any queue. This results in a reduced and predictable queue waiting time. Copyright © 2013 John Wiley & Sons, Ltd.

Concurrency Computat.: Pract. Exper. (2013) DOI: 10.1002/cpe

R. CUSHING ET AL.

Figure 10. Workflow as a Service at task-level. On the left, traditional workflow submission with unpredictable and potentially large queue waiting times. On the right, workflow harness jobs act as services by running multiple workflow jobs, thus circumventing queue waiting times. The apparent queue waiting time on the right is due to the harness job polling interval.

7. CONCLUSIONS We have shown that the WFaaS approach to task farming is a very promising approach for large parametric studies. The WFaaS paradigm at the data level as well at the task level reduces common scheduling overheads such as queue waiting times in shared distributed infrastructures and makes better use of the computing resources by making most of the allocated time slot given to each task. These methods were presented within the context of WS-VLAM, a scientific workflow management system and some important features were presented that allows scientists to build, run, and reproduce their experiments through provenance techniques. The method of resource submitters coupled with task harnessing allows easy cloud integration to exploit cloud resources for scientific computing without the need to setup complex virtual cluster middleware. Another advantage of WS-VLAM is that it gives seamless access to large computing and storage power, which can speed up the execution of scientific experiments. The end user is able to easily create and execute workflows in a grid environment, interactively control and monitor the components composing the application workflow, farm a large number of workflows, and collect provenance data about the execution of the simulations (Figure 7). A number of tasks that took a lot of time and prevented the scientist from focusing on the application were delegated to the WS-VLAM framework. These include automatically staging in/out multiple input/output files, the ability to run multiple simulations in parallel on grid and cloud resources, the possibility to retrieve the status of the running simulations at run time and to verify whether there are non-converging simulations or not, and the automatic collection of data provenance, which helps to trace back the reasons for failure and successful runs. From workflow management point of view, WS-VLAM is following the hybrid approach where data flows throughout the application workflow in P2P fashion, making it possible to design application workflows that require the execution of concurrent processing, although the control flow follows a message-passing paradigm going through the workflow engine. For applications implemented as web services, WS-VLAM has been extended with a new service, which can be deployed next to the application services to solve the data isolation problem. The service redirects data sets to the appropriate communication protocol, avoiding large data sets to be moved through SOAP calls. As cloud computing is garnering popularity within the scientific community, we are evaluating the use of cloud computing for farming workflows. In addition to the work described in Section 5, we are particularly interested in data movement and locality as this is one of the main hurdles in data-intensive scientific workflows. A particular challenge is to investigate how to manage multiple virtual machines from different cloud providers given the limitations such as bandwidth, latency, and vendor lock-in. Virtual resources also gives us the ability to further test the scalability of system with increased number of resources From a workflow point of view; virtual infrastructures provide Copyright © 2013 John Wiley & Sons, Ltd.

Concurrency Computat.: Pract. Exper. (2013) DOI: 10.1002/cpe

APPLYING WORKFLOW AS A SERVICE PARADIGM TO APPLICATION FARMING

a dynamic configurable resource and might pave the way to a new paradigm where infrastructures morph and adapt to the application instead of the other way around, as is traditionally carried out. ACKNOWLEDGEMENTS

This research was partially funded by the COMMIT‡ and VPH-Share§ projects. We also like to thank Dmitry Vasyunin, Vladimir Korkhov, Carole Leguy, and Wouter Huberts for their contribution to the applications. REFERENCES 1. Allen G, Goodale T, Russell M, Seidel E, Shalf J. Classifying and Enabling Grid Applications. In Grid Computing: Making the Global Infrastructure a Reality, Berman F, Fox G, Hey AJG (eds). John Wiley & Sons, Ltd, 2003; 601–614. 2. Abramson D, Buuya R, Giddy J. A computational economy for grid computing and its implementation in the Nimrod-G resource broker. Future Generation Computer Systems 2002; 18(8):1061–1074. 3. YarKhan A, Dongarra J, Seymour K. NetSolve to GridSolve: The Evolution of a Network Enabled Solver. IFIP WoCo9 conference “Grid-Based Problem Solving Environments: Implications for Development and Deployment of Numerical Software”, Prescott, AZ, July 17–21 2006; 215–224. 4. Tanaka Y, Nakada H, Sekiguchi S, Suzumura T, Matsuoka S. Ninf-G: a reference implementation of RPC-based programming middleware for grid computing. Journal of Grid Computing 2003; 1(1):41-51. 5. Casanova H, Obertelli G, Berman F, Wolski R. The AppLeS Parameter Sweep Template: User-level Middleware for the Grid, Sci. Program. Proceedings of the 2000 ACM/IEEE conference on Supercomputing ’00. IEEE Computer Society, Washington, DC, USA, 2000. Article 60. 6. Casanova H, Berman F. Parameter Sweeps on the Grid with APST, Concurrency: Pract. Exper. John Wiley & Sons, Ltd, 2003. 7. Ludäscher B, Altintas I, Bowers S, Cummings J, Critchlow T, Deelman E, Roure DD, Freire J, Goble C, Jones M, Klasky S, McPhillips T, Podhorszki N, Silva C, Taylor I, Vouk M. Scientific Process Automation and Workflow Management. Chapman & Hall, 2009. 8. Wang J, Altintas I, Hosseini PR, Barseghian D, Crawl D, Berkley C, Jones MB. Accelerating Parameter Sweep Workflows by Utilizing Ad-hoc Network Computing Resources: An Ecological Example. In Proceedings of the 2009 Congress on Services - I (SERVICES ’09). IEEE Computer Society, Washington, DC. USA 2009:267–274. 9. Anderson DP. BOINC: A System for Public-Resource Computing and Storage. In Proceedings of the 5th IEEE/ACM International Workshop on Grid Computing (GRID ’04). IEEE Computer Society, Washington, DC, USA, 2004; 4–10. 10. Tan W, Chard K, Sulakhe D, Madduri R, Foster I, Soiland-Reyes S, Goble C. Scientific workflows as services in caGrid: a Taverna and gRAVI Approach. IEEE International Conference on Web Services, 2009. ICWS 2009, Los Angeles, CA, USA, 6–10 July 2009; 413–420. 11. Pandey S, Karunamoorthy D, Buyya R. Workflow engine for clouds. In Cloud Computing: Principles and Paradigms. John Wiley & Sons, Inc, 2011; 321–344. 12. Korkhov V, Vasyunin D, Wibisono A, Belloum ASZ, Inda MA, Roos M, Breit TM, Hertzberger LO. VLAM-G: interactive data driven workflow engine for grid-enabled resources. Scientific Programming 2007; 15(3):173–188. 13. Elmroth E, Hernández F, Johan T. Three fundamental dimensions of scientific workflow interoperability: model of computation, language, and execution environment. Future Gener. Comput. Syst 2010; 26(2):245–256. 14. Smith W, Foster I, Taylor V. Scheduling with Advanced Reservations. Proceedings 14th International Conference on Parallel and Distributed Processing Symposium (IPDPS), Cancun, Mexico, 2000; 127–132. 15. Moreau L, Freire J, Futrelle J, McGrath RE, Myers J, Paulson P. The Open Provenance Model: An Overview, Provenance and Annotation of Data and Processes. Lecture Notes in Computer Science, (5272/2008) 2008:323–326. 16. Gerhards M, Skorupa S, Sander V, Belloum A, Vasunin D, Benabdelkader A. HisT/PLIER: A Two-Fold Provenance Approach for Grid-Enabled Scientific Workflows Using WS-VLAM. In Proceedings of the 2011 IEEE/ACM 12th International Conference on Grid Computing (GRID ’11). IEEE Computer Society, Washington, DC, USA, 2011; 224–225. 17. Korkhov V, Moscicki J, Krzhizhanovskaya V. Dynamic workload balancing of parallel applications with user-level scheduling on the grid. Future Generation Computer Systems 2009; 25(1):28–34. 18. Cushing R, Koulouzis S, Belloum ASZ, Bubak M. Prediction-based autoscaling of scientific workflows. In Proceedings of the 9th International Workshop on Middleware for Grids, Clouds and e-Science (MGC ’11), Lisbon, Portugal, 2011. 19. Leguy CAD, Bosboom H, Gelderblom APG, Hoeks FN, de Vosse V. Estimation of distributed arterial mechanical properties using a wave propagation model in a reverse way. Medical Engineering & Physics November 2010; 32(9):957–967. ‡ §

www.commit-nl.nl www.vph-share.eu

Copyright © 2013 John Wiley & Sons, Ltd.

Concurrency Computat.: Pract. Exper. (2013) DOI: 10.1002/cpe

R. CUSHING ET AL.

20. Cushing R, Belloum ASZ, Korkhov V, Vasyunin D, Bubak M, Leguy C. Workflow as a service: an approach to workflow farming. 3rd International Workshop on Emerging Computational Methods for the Life Sciences (ECMLS), New York, NY, USA, 2012; 23–31. 21. Du C, Kurowicka D, Cooke R. Techniques for generic probabilistic inversion. Comput. Stat. Data Anal 2006; 50:1164–1187. 22. Huberts W. Personalized computational modeling of vascular access creation. Ph.D Thesis, University of Maastricht, The Netherelands, 2012. (Available from: http://arno.unimaas.nl/show.cgi?fid=24959), Accessed Jan 2012. 23. Moscicki JT. Understanding and mastering dynamics in computing grids. Ph.D Thesis, University of Amsterdam: The Netherelands, 2011. (Available from: http://dare.uva.nl/en/record/373814), Access on: January 2012.

Copyright © 2013 John Wiley & Sons, Ltd.

Concurrency Computat.: Pract. Exper. (2013) DOI: 10.1002/cpe

Lihat lebih banyak...

Comentários

Copyright © 2017 DADOSPDF Inc.