Scaling Batch Processing on Cloud Compute with Runhouse and SkyPilot
Parallelizing work across cloud compute is clunky and manual. It doesn't have to be.
CEO @ đââď¸Runhouseđ
You have a Python function which works for one sample of data, and now need to parallelize it to thousands, millions, or billions of samples across cloud VMs. Youâd think this would be the âhello worldâ killer app of having a cloud account, and yet, itâs not trivial. Youâd either need to set up dedicated batch infrastructure (e.g. Ray, Spark, Dask) or learn your way around some cloud-specific batch processing system. Your job has suddenly grown from a Python function to include manual setup steps, config files, and DSLs, and soon youâll be documenting the steps for others to reproduce it. Your flexibility to cost-optimize is also now limited by the batch system, so utilizing spot compute, multiple cloud accounts, or idle existing compute (e.g. sitting in Kubernetes clusters) are often off the table.
Cloud Compute at your Fingertips
The Runhouse team has been working with the SkyPilot team since late 2022 to provide a magical cloud programming experience which makes your compute natively programmable. It allows you to fully specify your cloud program in Python, from optimizing and allocating the compute (via SkyPilot), to dispatching and executing your Python functions and classes on it (via Runhouse), taking advantage of whatever heterogeneous infrastructure you like. With Runhouse and SkyPilot, horizontally or vertically scaling your Python code is easy, with no manual setup or configs needed to keep track of alongside your code, no cloud-specific systems to learn or migrate your code to, and total flexibility to optimize cost.
A simple example
Below is a function I want to parallelize - it takes in a URL and returns a dictionary with counts for each word on the page. Let's walk through the code to parallelize it on cloud VMs with Runhouse and SkyPilot. You can also view and run the full code here. Itâs exactly what youâd hope âhello worldâ cloud parallelism would look like.
import runhouse as rh from bs4 import BeautifulSoup import requests def word_counts(url): response = requests.get(url) soup = BeautifulSoup(response.text, "html.parser") words = soup.get_text().split() return {word: words.count(word) for word in words}
Next, we specify the desired compute. SkyPilot uses your local cloud credentials to launch the desired VMs, and finds the cheapest available compute across your accounts.
if __name__ == "__main__": cluster = rh.cluster( name="rh-4x2CPU", instance_type="CPU:2", num_instances=4, spot=True, ).up_if_not()
SkyPilotâs rich launcher allows you to allocate from any provider, region, instance type, or existing compute (e.g. K8s) you like, and you can easily create multiple clusters to utilize a mix. Each is a separate Ray cluster, so you donât need to worry about the complexity of coupling them all together (networking, auth, failures, etc.). For example, in the code above weâve launched a four node spot cluster for simplicity, but we could also bring up four separate one-node spot clusters in a for loop so we donât need to worry about preemption bringing down the other nodes.
Weâll send the word_counts
function to our remote cluster(s) multiple times to create worker replicas of the function which execute in parallel. Note that Runhouse does not serialize the function to send it to the cluster (which creates a lot of versioning mayhem), but rather syncs the code over and imports from within a webserver on the cluster, just as you would if you deployed it manually through a serving framework like Flask or FastAPI.
Weâre also specifying the environment to send each function into, including dependencies and local code. Each env in Runhouse is actually in a separate process on the cluster, so sending our replicas into separate envs will parallelize them. Env processes are always created on the head node by default, so we also specify compute resources here so the processes are spread out across multiple nodes in the cluster (we could also send each to a specified node for finer control).
NUM_REPLICAS = 8 workers = [] for i in range(NUM_REPLICAS): env = rh.env( name=f"count_env_{i}", reqs=["bs4", "requests"], compute={"CPU": 1}, ) worker_fn = rh.function(word_counts).to(cluster, env=env, name=f"word_counts_{i}") workers.append(worker_fn)
We now have a list of local Python callables which behave just like our original function, but forward calls to the remote replica over HTTP. All thatâs left to do is call them in parallel over a list of URLs.
urls = [ "https://en.wikipedia.org/wiki/Python_(programming_language)", "https://en.wikipedia.org/wiki/Python_(genus)", "https://en.wikipedia.org/wiki/Python_(mythology)", "https://en.wikipedia.org/wiki/Python_(painter)", "https://en.wikipedia.org/wiki/Python_(Efteling)", "https://en.wikipedia.org/wiki/Python_(automobile_maker)", "https://en.wikipedia.org/wiki/Python_(nuclear_primary)", "https://en.wikipedia.org/wiki/Python_(missile)", "https://en.wikipedia.org/wiki/Python_(codename)", "https://en.wikipedia.org/wiki/Python_(film)", ] with ThreadPoolExecutor(max_workers=len(workers)) as executor: def call_with_round_robin(*args): while not workers: time.sleep(.25) worker = workers.pop(0) result = worker(*args) workers.append(worker) return result all_counts = executor.map(call_with_round_robin, urls)
Two Crucial Advantages: Flexibility and Reproducibility
Runhouse+SkyPilot allows you to write any hyperscale cloud program you can imagine in Python, realizing the vision of Sky-native application development. Itâs easy to extend this basic example to scale and cost optimize any which way:
- Vertically - bigger nodes and accelerators; greedy mixes of VM types for different size batches; using vectorized or distributed libraries from within your dispatched code (PyTorch, Jax, Ray, Spark, etc.)
- Horizontally - more nodes or clusters; using a mix of clouds or Kubernetes; various price models (spot, on-demand, reserve)
- Concurrently - more replicas per node; interleaving IO with computation; using map/reduce steps or blob storage to reduce roundtrips
- Fault-tolerantly - failure handling, preemption, caching, retries
Our users have seen cost reductions of ~50% out of the gate due to this greater flexibility, while unlocking new use cases through the improved local iteration experience of their workflows.
Using SkyPilot with Runhouse also greatly improves reproducibility by keeping the entire job inside Python. Normally, scaling and cost-optimization add operational complexity, meaning that others may struggle to recreate the setup and launch sequence, or it may be incompatible with certain execution or DevOps flows. Everyone has at some point inherited a âhereâs how you run our codeâ doc or worse, been on a team where only certain people can run the jobs. Reproducibility across execution systems is even more pernicious, where the existing orchestration or DevOps canât have new steps outside the code proper shoehorned in, or canât be cleanly extended to new infrastructure (e.g. distributed systems, multi-region, multi-cloud, specific instance types).
Cloud programming through Runhouse+SkyPilot takes advantage of the one invariant which is true across all these execution systems and users: running Python. Because the job fully fits inside Python, it can go anywhere Python can go - containers, notebooks, Airflow, serverless, new laptops, old laptops, IDEs, CI/CD, etc. When a new hire joins the team, running your batch job is just a matter of setting up cloud credentials and running your Python script. When you want to cost-optimize the evaluation step in your ML pipeline by using a different cloud, you pull down the code, send the evaluation to the other cloud, test locally, and then push your changes to prod normally.
Conclusion
Sometimes you donât need to adopt complex new infrastructure to do your batch processing, you just need native access to bigger compute. Whether youâre processing genomes, images, transactions, or clicks, your cloud parallel processing should be fast, cost-effective, and straightforward. Using SkyPilot with Runhouse is a powerful way to cost-optimize, scale, or simplify your existing batch jobs, or unblock new ones in only a few days.
Stay up to speed đââď¸đŠ
Subscribe to our newsletter to receive updates about upcoming Runhouse features and announcements.