ACAI SDK

Project

class acaisdk.project.Project

Bases: object

static create_project(project_id: str, admin_token: str, project_admin: str) → dict

This is the starting point of your ACAI journey.

Project, like its definition in GCP, is a bundle of resources. Users, files and jobs are only identifiable when ACAI system knows which project they are under.

Use this method to create a project.

Parameters
  • project_id – Name of the project, it should be unique, as it is also the ID ACAI uses to identify a project.

  • admin_token – One token to rule them all. This is the admin token to create new projects.

  • project_admin – An user name for the project administrator.

Returns

{
  "admin_token": "string",
  "project_id": "string",
  "project_admin_name": "string"
}

static create_user(project_id: str, admin_token: str, user: str, login: bool = True) → dict

Create a new user for the project.

Parameters
  • project_id – Project ID.

  • admin_token – Use the admin token you get from create_project()

  • user – Name for the new user.

  • login – By default, automatically export the env variable and load the new credential.

Returns

{
  "user_id": 0,
  "user_token": "string"
}

Credentials

acaisdk.credentials.login(token) → None

Log in with a new token. ENV variable will be automatically updated. refresh() is called by default.

Parameters

token – user token.

acaisdk.credentials.refresh() → None

Refresh credentials. Used when a new token is manually added to ENV.

acaisdk.credentials.has_logged_in(token_dict)

Determines if user has provided credentials.

This method is only a basic check, it does not mean that the credential is valid.

class acaisdk.credentials.Credentials

Bases: object

Almost all interactions between user and the ACAI backend requires a token to identify the user.

Credentials include a project name (or id, they are the same in ACAI) and a token. They are stored inside a configuration file (just like AWS CLI) or as environment variables (again, like AWS CLI).

The easiest way to work with credentials is to store them inside env variables, so that you never need to explicitly call any methods in this class. All API calls will automatically use methods under this class to authenticate with the backend. You just need to do:

>>> import os
>>> os.environ['ACAI_PROJECT'] = 'MyAwesomeProject'
>>> os.environ['ACAI_TOKEN'] = '***************D8S6'
static load()

Load credentials from ENV or from ~/.acai/credentials.

  • Credentials from ENV has higher priority than from file.

  • It is required that both project name and token are valid, API calls will fail if one of the two fields is empty.

Usage:

>>> Credentials.load()

Or (preferably): Just don’t call this method. It will be called automatically.

All later API calls will be implicitly using this set of credentials.

Returns

Credentials object

static configure(project_name, token) → None

Configure a new project in local credentials file

Notice that ENV variable still has higher priority even if you use this method to store a new set of credentials to file.

Usage:

>>> Credentials.configure('my_project', '****PROJECT_TOKEN****')
>>> Credentials.load()  # or implicitly load

Credential file is formatted as:

[default]
project_name = test_project

[test_project]
token = ************SD43

[dummy_project]
token = ************3452

Note: Writing to credential file is not tested. Use ENV to authenticate.

Returns

Credentials object

File

class acaisdk.file.File

Bases: object

static list_dir(directory: str) → List[Dict]

List all files and directories in a remote directory.

“version” denotes the latest version of a file. Notice that version number for directories makes no sense.

Returns

[
    {
    "path": "train.json",
    "version": 1,
    "is_dir": false
    },
    ...
]

static upload(local_to_remote: Union[Dict[str, str], List[Tuple[str, str]]], results: list = None) → acaisdk.fileset.FilesList

Upload multiple files.

Notice that the method does not deal with conflicting updates. It is up to the user to make sure there is no unintended uploads to the same remote location. Otherwise, multiple versions of the same file path will be created. (i.e. you won’t lose any data)

Parameters
  • local_to_remote

    Allows dictionary or list of tuples. E.g.

    {local_path: remote_path, ...} or [(local_path, remote_path), ...]

  • results

    Store result to another FilesList, for when you are interested in the result but want to chain this method with other methods like:

    >>> File.upload([('/a', '/b')], []).as_new_file_set()
    

Returns

FilesList object. It’s just a python list with additional functions for file and file set operations.

[("local_path", "remote_path:version"), ...]

static download(remote_to_local: Dict[str, str]) → None

Download multiple remote files to local.

If version is not specified for remote file, then the latest version will be downloaded by default.

Local path can be a directory, e.g. For a input dict {"/my_acai/b/c.json": "/home/ubuntu/stuff/"}, c.json will be downloaded to /home/ubuntu/stuff/c.json

Parameters

remote_to_local{"remote_path:version": "local_path", ...}

Returns

None

class UploadFileMapping

Bases: tuple

Field number 0: A fileset.FilesList for successfully mapped paths.

Field number 1: A list of files that are not accessible. (no permission, maybe).

property files_to_upload

Alias for field number 0

property files_ignored

Alias for field number 1

static convert_to_file_mapping(local_paths: List[str], remote_path: str, ignored_paths: List[str] = None) → acaisdk.file.UploadFileMapping

A nice method to make you happy.

Converts local file and directory paths to their corresponding remote paths. So that you do not need to specify local to remote path mappings one by one for the upload function.

For a local file system like

/a/b/c/1.txt
/a/b/c/d/2.txt
/a/b/3.txt

Running

convert_to_file_mapping(['/a/b/3.txt', '/a/b/c/'], '/allen/')

will result in a remote file system structure like:

/allen/1.txt
/allen/d/2.txt
/allen/3.txt

Notice that if you are writing to a remote directory, a “/” must be added at the end of the path string, like “/allen/” instead of “/allen”.

Example usage:

File.convert_to_file_mapping(['/a/b/c/'], '/allen/') \
    .files_to_upload \
    .upload() \
    .as_new_file_set('my_training_files')

Notice that the method is not transactional. It does not protect itself from change of files in local directories.

Returns

File.UploadFileMapping

static list_file_versions(file_name)
static resolve_vague_path(vague_path)

File Set

class acaisdk.fileset.FilesList

Bases: list

as_new_file_set(file_set_name)

Create a file set for a newly uploaded batch.

Usage can be found at File.upload and File.convert_to_file_mapping

upload()

Upload a FilesList to data lake.

Usage can be found at File.convert_to_file_mapping

class acaisdk.fileset.FileSet

Bases: object

static create_file_set(file_set_name: str, remote_entities: list) → dict

Create a file set on a list of remote files or file sets.

Denoting a file is the same as anywhere else. Use “@” prefix to denote file sets.

Examples:

  1. Create file set from files

create_file_set("my_new_file_set_name",
                ["/my_data/test.json", "/my_data/a/b.txt:3"])
  1. Create file set from other file sets

create_file_set("my_new_file_set_name",
                ["@file_set_a:1", "@file_set_b"])
  1. You can also mix file and file sets

create_file_set("my_new_file_set_name",
                ["@file_set_a:1",
                 "/my_data/a/b.txt:3",
                 "@file_set_c"]
                )
Returns

{
  "id": "HotpotQA:1",
  "files": [
    "data/train.json:2"
  ]
}

static list_file_set_content(vague_name) → dict

List all files in a file set.

static resolve_vague_name(vague_name)
static download_file_set(vague_name: str, mount_point: str = None, force: bool = False) → None

Download a file set to local device.

Parameters
  • vague_name – File set name can be vague (with or without version number). Latest version of the file set will be chosen if no version given.

  • mount_point

    Which local directory to download the file set to. This won’t actually “mount” any device on your local file system. But the behavior will be similar to mounting the root directory in the remote file system to the “mount_point” directory.

    e.g. For a file set allen:2 with file

    /allen/1.txt:1
    /allen/d/2.txt:1
    /allen/3.txt:3
    

    calling download_file_set('allen:2', '/local_tmp/') results in a local directory hierarchy of:

    /local_tmp/allen/1.txt
    /local_tmp/allen/d/2.txt
    /local_tmp/allen/3.txt
    

    Notice that the SDK won’t create the mount_point directory for you, but it will create folders inside the mount_point automatically.

  • force – If local directory has conflicting file names, choose if continue to download.

static list_file_set_versions(file_set_name)
static list_file_sets() → List[str]

Show all file sets under the project.

Job

class acaisdk.job.Job

Run a job on the cloud.

Typical usage (go to example folder for a sample workflow on Jupyter notebook):

command = "mkdir -p ./my_output/ && " \
          "(cat Shakespeare/* | python3 wordcount.py ./my_output/)"
attr = {
    "v_cpu": "0.2",
    "memory": "64Mi",
    "gpu": "0",
    "command": command,
    "container_image": "pytorch/pytorch",
    'input_file_set': 'shakespeare.texts',
    'output_path': './my_output/',
    'code': '/wordcount.zip',
    'description': 'count some words from Shakespeare works',
    'name': 'my_acai_job'
}

Job().with_attributes(attr).register().run()
Variables
  • id – (int) Job id. Not needed for job registration since it does not exist at that point (obviously).

  • name – Job name

  • input_file_set – This input set will be downloaded to the container where the job is executed. If no version given, latest file set is chosen.

  • output_path – Only files written to this output folder in the container will be uploaded and packed into a new file set. Can be a relative path (relative to /acai/ directory).

  • output_file_set – Output file set name. Not need for job submission since the name is decided by the backend.

  • code – Remote location of the code zip.

  • command – The command in which the code is executed. As this is a shell command, the execution is carried out by the default shell in the container. One benefit is that you can make full use of shell grammars like &&, ; and |, etc.

  • container_image – ID for the docker image so that ACAI can pull the image down to the execution host.

  • description – Some description for the job. Just like git commit -m

  • submitted_time – Unix timestamp of the submission time. Not needed for submission.

  • updated_time – Deprecated.

  • v_cpu – (str) Number of virtual CPUs the execution container can have. See doc for with_resources() for recommended usage.

  • memory – (str) The amount of physical memory for the container. Notice that exceeding the limit will result in job failure. See doc for with_resources() for recommended usage.

  • gpu – (str) Number of GPUs to allocate to the container. See doc for with_resources() for recommended usage.

register()

Register the job with ACAI backend. Only registered job can be run.

run() → acaisdk.job.Job

Execute registered job.

with_attributes(d: dict) → acaisdk.job.Job

Fill job object with attributes.

Parameters

d – Dict of attributes to add to the job object.

Returns

Updated job object.

with_resources(vcpu: Union[int, str] = None, gpu: Union[int, str] = None, mem: Union[int, str] = None) → acaisdk.job.Job

A more friendly method for adding resource constraints.

Each of the three parameters can be str or int. For example,

mem=1e9 means max mem usage of 1e9 bytes (~1GB)

mem="100Mi" means max mem usage of 100MB

Memory string format is the same as here: https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#meaning-of-memory

static list_jobs() → List[acaisdk.job.Job]

List all jobs under current project.

Returns

a list of Job objects

static find(job_id: int) → acaisdk.job.Job

Find a job by job ID.

Parameters

job_id – integer job ID

Returns

Job object of the found job.

static check_job_status(job_id) → acaisdk.job.JobStatus

Check job status by job ID.

Usage:

>>> Job.check_job_status(10)
status() → acaisdk.job.JobStatus

Check the status of the current job.

Usage:

>>> status = Job.find(10).status()

In the mean time, output file set is updated. But it will only be meaningful when the job is successfully finished. You can then access it by

>>> j = Job.find(123).status()
>>> output_file_set = j.output_file_set
Returns

JobStatus

get_output_file_set()

Get the output file set of the job.

Notice that this method is only safe when the job in question is submitted and finished. Otherwise it may raise exception.

The return value only makes sense when the job is successfully finished.

wait() → acaisdk.job.JobStatus

Block until job finish or fail.

By the way, as wait finishes, the output file set will become available.

example:

>>> j = Job.with_attributes({...}).register().run()
>>> if j.wait() == JobStatus.FINISHED:
>>>     print(j.output_file_set)
Returns

JobStatus

property dict

Get a dictionary representation of the job.

Usage:

>>> j = Job.find(10)
>>> d = j.dict

Just to digress a bit, to get a pretty formatted string representation of a job you can just do:

>>> j = Job.find(10)
>>> print(j)
static from_dict(d: Dict) → acaisdk.job.Job

Wrapper for with_attributes() method. Has the exact same behavior.

static from_json(path) → acaisdk.job.Job

Wrapper for with_attributes() method. For when you want to load the job settings from a JSON file.

static from_yaml(path) → acaisdk.job.Job

Wrapper for with_attributes() method. For when you want to load the job settings from a YAML file.

class acaisdk.job.JobStatus
Variables
  • QUEUEING

  • LAUNCHING

  • DOWNLOADING

  • RUNNING

  • UPLOADING

  • FINISHED

  • FAILED

  • KILLED

  • CONTAINER_CRASHED

  • UNKNOWN – It seems that the job does not exist

Metadata

class acaisdk.meta.Meta

Bases: object

static update_file_meta(file_path, tags: list = None, kv_pairs: dict = None)

Add new meta data to a file.

Parameters
  • file_path – can be without version. Latest is used by default.

  • tags – a list of tags, same as adding a “tags” key in kv_pairs

  • kv_pairs – key-value pairs of metadata.

static update_file_set_meta(file_set, tags: list = None, kv_pairs: dict = None)

Same usage as update_file_meta()

static update_job_meta(job_id, tags: list = None, kv_pairs: dict = None)

Same usage as update_file_meta()

static find_file(*conditions: acaisdk.meta.Condition)

File a job that meets a list of constraints.

static find_job(*conditions: acaisdk.meta.Condition)

Find a job that meets a list of constraints. Same usage as find_file()

static find_file_set(*conditions: acaisdk.meta.Condition)

Find a file set that meets a list of constraints. Same usage as find_file()

static query_meta(entity_type: str, *conditions: acaisdk.meta.Condition)

Base method for querying files, jobs and file sets.

It is recommended to use find_file(), find_job() and find_file_set() instead.

static get_file_meta(*file_list)

Get the metadata for a list of files.

Directories will be ignored.

Usage:

>>> Meta.get_file_meta('/a/b.txt', '/c.json', '/hotpot/eval.py', ...)
Returns

{'data': [{
            '__create_time__': unix_timestamp,
            '__creator_id__': int,
            '__full_path__': str,
            '__size__': int in bytes,
            '__type__': str,
            '__version__': int,
            '_id': full path with version,
            'my_meta_key1': 'my_meta_value_1',
            'tags': ['hotpot', 'cnn', ...]
           },
           { ... },
           ...
          ],
 'status': 'success'}

static del_file_meta(file_path, tags: list, keys: list)
static del_file_set_meta(file_set, tags: list, keys: list)
static del_job_meta(job_id: int, tags: list, keys: list)
class acaisdk.meta.Condition(key: str)

Constraints to apply when filtering out jobs, files and file sets.

Example: find the job output of the best performing CNN model by applying multiple Conditions.

Meta.find_file_set(
    Condition('model').value('cnn'),
    Condition('accuracy').max(),
    )

Notice that conditions are applied in an “AND” fashion. “OR” logic is not supported.

value(val) → acaisdk.meta.Condition

Find entity with value equals val

max() → acaisdk.meta.Condition

Find entity with maximum value

min() → acaisdk.meta.Condition

Find entity with minimum value

range(start, end) → acaisdk.meta.Condition

Find entity with value ranging from start (exclusive) to end (inclusive)

re() → acaisdk.meta.Condition

Query with a regular expression instead of exact string mapping.

Only effective when the condition type is “value” and value is string.

Condition('model').value('cnn').re()