iron_worker_python is Python language binding for IronWorker.
IronWorker is a massively scalable background processing system. See How It Works.
To start using iron_worker_python, you need to sign up and get an OAuth token.
- Go to http://iron.io/ and sign up.
- Get an OAuth Token at http://hud.iron.io/tokens
The recommended way to install iron_worker_python is through pip
or easy_install
. The package name is iron-worker:
$ easy_install iron-worker
For pip
:
$ pip install iron-worker
If you don't want to use pip
or easy_install
, you can always install from source. First, you'll need iron_core_python. Download that; the file you're after is named iron_core.py. Then, download the iron_worker_python library. The file you're after is named iron_worker.py. As long as both iron_core.py and iron_worker.py are in a directory in the import path, you're all set.
Including the library is easy:
from iron_worker import *
iron_worker_python follows the standard configuration convention followed by the other official libraries.
Create a file in the root of your project named "iron.json". You'll need your project ID and OAuth token. You can get them from the HUD. Include them in the iron.json file as follows:
{
"project_id": "Your_Project_ID",
"token": "Your_OAuth_Token"
}
Workers are just Python scripts that are run in the IronWorker cloud. Write them the way you would write any Python script.
Here's an example worker:
print "Hello Python World!\n"
Iron.io has a command line interface to the IronWorker service that makes working with the service a lot easier and more convenient. It does, however, require you to have Ruby 1.9+ installed and to install the iron_worker_ng
gem. Once Ruby 1.9+ is installed, you can just run the following command to get the gem:
$ gem install iron_worker_ng
.worker files are a simple way to define your worker and its dependencies. Save the following in a file called HelloWorld.worker
:
# set the runtime language; this should be "python" for Python workers
runtime "python"
# exec is the file that will be executed when you queue a task
exec "hello.py"
Once you have your configuration file, your .worker file, and the gem in place, you can run iron_worker upload HelloWorld
(if your .worker file is named HelloWorld.worker) to upload your worker.
To run your code, you need to queue a task against it.
worker = IronWorker()
task = worker.queue(code_name="HelloWorld")
That will queue a task against the CodePackage with the name "HelloWorld". To pass a payload, just pass the data to worker.queue
. It will be JSON-serialised and passed into your worker at runtime:
worker = IronWorker()
task = worker.queue(code_name="HelloWorld", payload={"fruits": ["apples", "oranges", "bananas"], "best_song_ever": "Call Me Maybe"})
If you'd like to reuse Tasks or do more complex things with them, you can also instantiate them as instances of the Task
class, then pass them to worker.queue
method (this is actually what worker.queue
is doing, transparently):
worker = IronWorker()
task = Task(code_name="HelloWorld")
task.payload = {
"fruits": ["apples", "oranges", "bananas"],
"best_song_ever": "Call Me Maybe"
}
response = worker.queue(task)
If you'd like to, you can even set your task to run after a delay:
task = Task(code_name="HelloWorld")
task.payload = {
"fruits": ["apples", "oranges", "bananas"],
"best_song_ever": "Call Me Maybe"
}
task.delay = 300 # start this task in 300 seconds (5 minutes)
response = worker.queue(task)
- priority: Setting the priority of your job. Valid values are 0, 1, and 2. The default is 0.
- timeout: The maximum runtime of your task in seconds. No task can exceed 3600 seconds (60 minutes). The default is 3600 but can be set to a shorter duration.
- delay: The number of seconds to delay before actually queuing the task. Default is 0.
- label: Optional text label for your task.
- cluster: cluster name ex: "high-mem" or "dedicated". This is a premium feature for customers to have access to more powerful or custom built worker solutions. Dedicated worker clusters exist for users who want to reserve a set number of workers just for their queued tasks. If not set default is set to "default" which is the public IronWorker cluster.
If you'd like to run a task at a specific time, or set a task to be run repeatedly, you want to create a scheduled task. Unlike previous versions of iron_worker_python, we've unified tasks and scheduled tasks into the same interface. iron_worker_python will automatically detect when you want to create a scheduled task and react accordingly.
task = Task(code_name="HelloWorldRepeating")
task.payload = {
"fruits": ["apples", "oranges", "bananas"],
"best_song_ever": "Call Me Maybe"
}
task.run_every = 300 # The task will run every 300 seconds (5 minutes)
task.scheduled = True
task.label = "custom_label"
response = worker.queue(task)
Likewise, if you'd like to run a task at a specific time, doing so is easy. Just pass a datetime.datetime
object:
task = Task(code_name="HelloFuture")
task.start_at = datetime.now() + timedelta(hours=1) # start in an hour
task.scheduled = True
response = worker.queue(task)
- run_every: The amount of time, in seconds, between runs. By default, the task will only run once. run_every will return a 400 error if it is set to less than 60.
- end_at: The time tasks will stop being queued. Should be an instance
datetime.datetime
. - run_times: The number of times a task will run.
- priority: Setting the priority of your job. Valid values are 0, 1, and 2. The default is 0. Higher values means tasks spend less time in the queue once they come off the schedule.
- start_at: The time the scheduled task should first be run. Should be an instance
datetime.datetime
. - label: Optional label for adding custom labels to scheduled tasks.
- cluster: cluster name ex: "high-mem" or "dedicated". This is a premium feature for customers to have access to more powerful or custom built worker solutions. Dedicated worker clusters exist for users who want to reserve a set number of workers just for their queued tasks. If not set default is set to "default" which is the public IronWorker cluster.
You can specify priority of the task using priority
field:
task.priority = 0 # default value, lowest priority
task.priority = 1 # medium priority
task.priority = 2 # high priority
Value of priority field means the priority queue to run the task in. Valid values are 0, 1, and 2. 0 is the default.
To get the status of a worker, you can use the worker.task
method.
task = worker.queue('HelloWorld')
details = worker.task(task)
print details.status # prints 'queued', 'complete', 'error' etc.
If you don't have an instance of Task
, you can also pass in the task ID. Note that if you do this, however, and you are attempting to retrieve that status of a scheduled task, you need to declare that as well:
task = worker.queue("HelloWorld")
details = worker.task(id=task.id)
print details.status
scheduled_task = worker.queue("HelloWorld", run_every=60, run_count=3) # run this task 3 times, once a minute
scheduled_details = worker.task(scheduled_task.id, scheduled=True)
print scheduled_details.status
Use any function that prints text inside your worker to insert messages into you worker's log. To retrieve a worker's log, use the worker.log
method.
task = worker.queue('HelloWorld')
time.sleep(10)
print worker.log(task)
If you don't have an instance of the Task
object handy, you can also just use the ID of the task:
task = worker.queue('HelloWorld')
time.sleep(10)
print worker.log(id=task.id)
When your code is executed, it will be passed three program arguments:
- -id - The task id.
- -payload - the filename containing the data payload for this particular task.
- -d - the user writable directory that can be used while running your job.
Simply open the filename passed by -payload
, read its contents, and (if you used iron_worker_python to queue the task), decode the string as JSON:
payload = None
payload_file = None
for i in range(len(sys.argv)):
if sys.argv[i] == "-payload" and (i + 1) < len(sys.argv):
payload_file = sys.argv[i + 1]
break
f = open(payload_file, "r")
contents = f.read()
f.close()
payload = json.loads(contents)
If you need to run slave task from master task you should add two more lines to master worker file:
# MasterTask.worker
runtime "python"
exec "master_task.py"
# install iron_worker on the server side
pip 'iron_worker'
remote
After it you can call your uploaded task from code:
from iron_worker import *
worker = IronWorker(project_id=your_project_id, token=your_project_token)
task = worker.queue(code_name="SlaveTask")
You can find more documentation here:
- Iron.io Dev Center: Full documentation for Iron.io products.
- Example Workers