Ray: Distributed Computing for All, Part 1

This is the first of a two-part series on distributed computing using Ray. This part shows how to use Ray on your local PC, and part 2 shows how to scale Ray on multi-server cloud clusters.
you've got a new 16-core laptop or desktop, and you're ready to test its capabilities with some serious math.
You're a Python programmer, even if you're not yet an expert, so you open up your favorite LLM and ask them something like this.
“I would like to count the number of prime numbers within a given input range. Please provide me the Python code for this.”
After a few seconds, LLM gives you some code. You can tweak it a bit by going back and forth, and you end up with something like this:
import math, time, os
def is_prime(n: int) -> bool:
if n < 2: return False
if n == 2: return True
if n % 2 == 0: return False
r = int(math.isqrt(n)) + 1
for i in range(3, r, 2):
if n % i == 0:
return False
return True
def count_primes(a: int, b: int) -> int:
c = 0
for n in range(a, b):
if is_prime(n):
c += 1
return c
if __name__ == "__main__":
A, B = 10_000_000, 20_000_000
total_cpus = os.cpu_count() or 1
# Start "chunky"; we can sweep this later
chunks = max(4, total_cpus * 2)
step = (B - A) // chunks
print(f"CPUs~{total_cpus}, chunks={chunks}")
t0 = time.time()
results = []
for i in range(chunks):
s = A + i * step
e = s + step if i < chunks - 1 else B
results.append(count_primes(s, e))
total = sum(results)
print(f"total={total}, time={time.time() - t0:.2f}s")
You use the program and it works perfectly. The only problem is that it takes very little time to run, maybe thirty to sixty seconds, depending on the size of your input range. That's probably not acceptable.
What are you doing now? You have several options, the three most common of which are likely to be:
– Simulate code using threads or multiprocessing
– Rewrite the code in a “fast” language like C or Rust
– Try a library like Cython, Numba or NumPy
These are all viable options, but each has its own problems. Options 1 and 3 greatly increase the complexity of your code, and the middle option may require you to learn a new programming language.
What if I told you there is another way? One where the changes required to your existing code will be kept to a minimum. One where your runtime is automatically spread across all your available cores.
That's exactly what the third person said Ray the library promises to do it.
What is Ray?
The Ray Python library is a source open source distributed computing framework it is designed to do easy to measure Python programs go from laptop to cluster with minor code changes.
Ray makes it easy to scale and distribute computing workloads β from deep learning to data processing β across remote computing clusters, while also delivering application runtime performance improvements to your laptop, desktop, or even remote cloud-based computing cluster.
Ray provides a rich collection of libraries and integrations built on a flexible distributed framework, making distributed computing simple and accessible to all.
In short, Ray allows you to parallelize and distribute your Python code with minimal effort, whether it's running locally on a laptop or on a large cloud-based cluster.
Using Ray
Throughout this article, I'll walk you through the basics of using Ray to accelerate CPU-intensive Python code, and we'll set up example code snippets to show you how easy it is to incorporate the power of Ray into your workload.
To get the most out of using Ray, if you're a data scientist or machine learning engineer, there are a few key concepts you need to understand first. Ray is made up of several parts.
Ray Data is a scalable library designed for data processing in ML and AI applications. It provides flexible, high-performance APIs for AI tasks, including cluster annotation, data preprocessing, and ML training data ingestion.
Ray's train is a flexible, scalable library designed for distributed machine learning and fine-tuning.
Ray Tune used for Hyperparameter Tuning.
Ray Khonza is a scalable library for serving models to support web indexing APIs.
Ray RLlib used for advanced reinforced learning
As you can see, Ray is very focused on modeling large-scale languages ββand AI applications, but there is one last important part that I haven't mentioned yet, and that's what I'm going to use in this article.
Ray Core designed to scale CPU intensive Python programs. It's designed to spread your Python workload over all available cores in whatever system you're running.
This article will specifically talk about Ray Core.
Two important concepts to grasp within Ray Core are these activities again the players.
The jobs are there formless workers or services that are implemented using Ray by decorating standard Python functions.
Players (or with a kingdom workers) are used, for example, if you need to track and maintain the state of variable dependencies across your distributed cluster. Actors are initialized with standard Python decorators classes.
Both characters and functions are defined using the same @ray.far away the decorator. Once defined, these functions are performed on a specialized basis .remote() method given by Ray. We will look at an example of this next.
Setting up the development environment
Before we start coding, we have to set up a development environment to keep our projects locked away from each other. I'll use conda for this, but feel free to use any tool you like. I will be running my code using a Jupyter notebook in the WSL2 Ubuntu shell on Windows.
$ conda create -n ray-test python=3.13 -y
$ conda activate ray-test
(ray-test) $ conda install ray[default]
Example code – calculating prime numbers
Let's review the example I gave earlier: calculating the number of primes between 10,000,000 and 20,000,000.
We will run our original Python code and time how long it takes.
import math, time, os
def is_prime(n: int) -> bool:
if n < 2: return False
if n == 2: return True
if n % 2 == 0: return False
r = int(math.isqrt(n)) + 1
for i in range(3, r, 2):
if n % i == 0:
return False
return True
def count_primes(a: int, b: int) -> int:
c = 0
for n in range(a, b):
if is_prime(n):
c += 1
return c
if __name__ == "__main__":
A, B = 10_000_000, 20_000_000
total_cpus = os.cpu_count() or 1
# Start "chunky"; we can sweep this later
chunks = max(4, total_cpus * 2)
step = (B - A) // chunks
print(f"CPUs~{total_cpus}, chunks={chunks}")
t0 = time.time()
results = []
for i in range(chunks):
s = A + i * step
e = s + step if i < chunks - 1 else B
results.append(count_primes(s, e))
total = sum(results)
print(f"total={total}, time={time.time() - t0:.2f}s")
And the result?
CPUs~32, chunks=64
total=606028, time=31.17s
Now, can we improve that using Ray? Yes, by following this simple 4 step process.
Step 1 – Run Ray. Add these two lines to the beginning of your code.
import ray
ray.init()
Step 2 – Create our remote task. That's easy. Just decorate the function we want to improve with the @ray.remote decorator. The work that needs to be decorated is the one that does the most work. In our example, that's the count_primes function.
@ray.remote(num_cpus=1)
def count_primes(start: int, end: int) -> int:
...
...
Step 3 – Introduce related activities. Call your remote function using the .far away Ray direction.
refs.append(count_primes.remote(s, e))
Step 4 – Wait for all our tasks to finish. Each function in Ray returns a ObjectRef if it is called. This is a promise from Ray. It means that Ray has set the job to run remotely, and Ray will return its value at some point in the future. We monitor all ObjectRefs returned by invoking functions using i ray.get() work. This blocks until all tasks are completed.
results = ray.get(tasks)
Let's put this all together. As you'll see, the changes to our original code are minimal – just four lines of code added and a print statement to show the number of nodes and cores we're running on.
import math
import time
# -----------------------------------------
# Change No. 1
# -----------------------------------------
import ray
ray.init(auto)
def is_prime(n: int) -> bool:
if n < 2: return False
if n == 2: return True
if n % 2 == 0: return False
r = int(math.isqrt(n)) + 1
for i in range(3, r, 2):
if n % i == 0:
return False
return True
# -----------------------------------------
# Change No. 2
# -----------------------------------------
@ray.remote(num_cpus=1) # pure-Python loop β 1 CPU per task
def count_primes(a: int, b: int) -> int:
c = 0
for n in range(a, b):
if is_prime(n):
c += 1
return c
if __name__ == "__main__":
A, B = 10_000_000, 60_000_000
total_cpus = int(ray.cluster_resources().get("CPU", 1))
# Start "chunky"; we can sweep this later
chunks = max(4, total_cpus * 2)
step = (B - A) // chunks
print(f"nodes={len(ray.nodes())}, CPUs~{total_cpus}, chunks={chunks}")
t0 = time.time()
refs = []
for i in range(chunks):
s = A + i * step
e = s + step if i < chunks - 1 else B
# -----------------------------------------
# Change No. 3
# -----------------------------------------
refs.append(count_primes.remote(s, e))
# -----------------------------------------
# Change No. 4
# -----------------------------------------
total = sum(ray.get(refs))
print(f"total={total}, time={time.time() - t0:.2f}s")
Now, has it all been worth it? Let's run the new code and see what we get.
2025-11-01 13:36:30,650 INFO worker.py:2004 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265
/home/tom/.local/lib/python3.10/site-packages/ray/_private/worker.py:2052: FutureWarning: Tip: In future versions of Ray, Ray will no longer override accelerator visible devices env var if num_gpus=0 or num_gpus=None (default). To enable this behavior and turn off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0
warnings.warn(
nodes=1, CPUs~32, chunks=64
total=606028, time=3.04s
Well, the result speaks for itself. The Ray Python code is 10x faster rather than standard Python code. Not too shabby.
Where does this increase in speed come from? Well, Ray can distribute your work across all the cores in your system. The core is like a mini-CPU. When we ran our original Python code, it only used one thread. That's fine, but if your CPU has more than one core, which most modern PCs do, then you're leaving money on the table, so to speak.
In my case, the CPU has 24 cores, so it's no surprise that my Ray code was much faster than the non-Ray code.
Monitoring Ray's activities
Another point worth noting is that Ray makes it very easy to monitor the execution of jobs using a dashboard. Notice in the output we got when running our Ray example code, we saw this,
... -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265
It shows a local URL link because I use this on my desktop. If you were running this on a cluster, the URL would point to the cluster head node.
When you click on the given URL link, you should see something like this,
From this main screen, you can drill down to monitor many aspects of your Ray programs using the menu links at the top of the page.
Ray characters are used
I mentioned earlier that actors were an important part of Ray core processing. Actors are used to coordinate and share data between Ray's functions. For example, say you want to set a global limit for ALL active functions to adhere to. Let's say you have a bunch of worker jobs, but you want to ensure that only a maximum of five of those jobs can run concurrently. Here is some code that you might think might work.
import math, time, os
def is_prime(n: int) -> bool:
if n < 2: return False
if n == 2: return True
if n % 2 == 0: return False
r = int(math.isqrt(n)) + 1
for i in range(3, r, 2):
if n % i == 0:
return False
return True
def count_primes(a: int, b: int) -> int:
c = 0
for n in range(a, b):
if is_prime(n):
c += 1
return c
if __name__ == "__main__":
A, B = 10_000_000, 20_000_000
total_cpus = os.cpu_count() or 1
# Start "chunky"; we can sweep this later
chunks = max(4, total_cpus * 2)
step = (B - A) // chunks
print(f"CPUs~{total_cpus}, chunks={chunks}")
t0 = time.time()
results = []
for i in range(chunks):
s = A + i * step
e = s + step if i < chunks - 1 else B
results.append(count_primes(s, e))
total = sum(results)
print(f"total={total}, time={time.time() - t0:.2f}s")
We've used global variables to limit the number of active functions, and the code is syntactically correct, running without error. Unfortunately, you will not get the result you expected. That's because each Ray function runs in its own process environment and has its own copy of the global variable. A global variable is NOT assigned between functions. So if we run the above code, we will see the output like this,
Total calls: 200
Intended GLOBAL_QPS: 5.0
Expected time if truly global-limited: ~40.00s
Actual time with 'global var' (broken): 3.80s
Observed cluster QPS: ~52.6 (should have been ~5.0)
To fix this, we use an actor. Remember that an actor is a Python class decorated by Ray. Here is the code with the cast.
import time, ray
ray.init(ignore_reinit_error=True, log_to_driver=False)
# This is our actor
@ray.remote
class GlobalPacer:
"""Serialize calls so cluster-wide rate <= qps."""
def __init__(self, qps: float):
self.interval = 1.0 / qps
self.next_time = time.time()
def acquire(self):
# Wait inside the actor until we can proceed
now = time.time()
if now < self.next_time:
time.sleep(self.next_time - now)
# Reserve the next slot; guard against drift
self.next_time = max(self.next_time + self.interval, time.time())
return True
@ray.remote
def call_api_with_limit(n_calls: int, pacer):
done = 0
for _ in range(n_calls):
# Wait for global permission
ray.get(pacer.acquire.remote())
# pretend API call (no extra sleep here)
done += 1
return done
if __name__ == "__main__":
NUM_WORKERS = 10
CALLS_EACH = 20
GLOBAL_QPS = 5.0 # cluster-wide cap
total_calls = NUM_WORKERS * CALLS_EACH
expected_min_time = total_calls / GLOBAL_QPS
pacer = GlobalPacer.remote(GLOBAL_QPS)
t0 = time.time()
ray.get([call_api_with_limit.remote(CALLS_EACH, pacer) for _ in range(NUM_WORKERS)])
dt = time.time() - t0
print(f"Total calls: {total_calls}")
print(f"Global QPS cap: {GLOBAL_QPS}")
print(f"Expected time (if capped at {GLOBAL_QPS} QPS): ~{expected_min_time:.2f}s")
print(f"Actual time with actor: {dt:.2f}s")
print(f"Observed cluster QPS: ~{total_calls/dt:.1f}")
Our remote code is included in the class (GlobalPacer) and decorated with ray.remote, which means that it applies to all active functions. We can see the difference this makes in the output by using the updated code.
Total calls: 200
Global QPS cap: 5.0
Expected time (if capped at 5.0 QPS): ~40.00s
Actual time with actor: 39.86s
Observed cluster QPS: ~5.0
Summary
This article was introduced Rayan open source Python framework that makes it easy to We measure the most demanding computer programs from a single core to multiple cores or even a cluster with minor code changes.
I briefly mentioned the key components of RayβRay Data, Ray Train, Ray Tune, Ray Serve, and Ray Coreβemphasizing that Ray Core is ideal for general-purpose CPU scaling.
I explained some important concepts in Ray Core, such as its introduction of functions (stateless parallel functions), actors (high state operators and shared connections), and ObjectRefs (a future promise of the function's return value)
To demonstrate the advantages of using Ray, I started with a simple CPU example – calculating prime numbers over a range – and showed how running it on a single core can be slow with a trivial Python implementation.
Instead of rewriting code in another language or using complex libraries, Ray lets you do just that keep up with the work with just four easy steps and just a few extra lines of code:
- ray.init() to start Ray
- Decorate your tasks with @ray.remote to turn them into interactive tasks
- .remote() to run functions once and for all
- ray.get() to collect the results of the function.
This approach reduces the runtime of the first computation instance from ~30 seconds to ~3 seconds on a 24-core machine.
I also mentioned how easy it is to monitor Ray's performance using the built-in dashboard and showed how to access it.
Finally, I gave an example of using Ray Actor by showing why global variables are not valid coordinating all activities, since each worker has its own memory space.
In the second part of this series, we will see how we can take things to the next level by enabling Ray jobs to use more CPU power as we access large multi-node servers in the cloud with Amazon Web Services.



