Project configuration
Project structure
An earthmover
project consists of
- source data to be transformed, such as CSV or TSV files, or a relational database table
- an
earthmover.yml
YAML configuration file that defines- your project
config
- (optional)
definitions
of YAML anchors to be used elsewhere - (optional)
packages
to include in your project - data
sources
to transform - data
transformations
to execute - data
destinations
to materialize
- your project
- (optional) Jinja templates to be rendered for each row of the final
destination
Tips
- To quickly see
earthmover
in action, runearthmover init
to initialize a simple starter project. - Example
earthmover
projects can be found in the example_projects/ folder. - See the recommended best practices for building an
earthmover
project
Within an earthmover project folder, you can simply
to run the data transformations.YAML configuration
All the instructions for earthmover
— where to find the source data, what transformations to apply to it, and how and where to save the output — are specified in a YAML configuration file, typically named earthmover.yml
.
Note that earthmover.yml
may also contain Jinja and/or parameters which are rendered in an initial compilation step before execution.
version
A version
label is required in earthmover.yml
for compatibility reasons. The current version
is 2
.
config
The config
section specifies various options for the operation of this tool.
A sample config
section is shown here; the options are explained below.
config:
output_dir: ./
state_file: ~/.earthmover.csv
log_level: INFO
tmp_dir: /tmp
show_stacktrace: True
show_graph: True
macros: >
{% macro example_macro(value) -%}
prefixed-int-{{value|int}}
{%- endmacro %}
parameter_defaults:
SOURCE_DIR: ./sources/
show_progress: True
git_auth_timeout: 120
Required? | Key | Type | Description | Default value |
---|---|---|---|---|
(optional) | output_dir |
string |
The folder where destinations will be materialized. |
./ (current directory) |
(optional) | state_file |
string |
The file where tool state is maintained. | ~/.earthmover.csv on *nix systems, C:/Users/USER/.earthmover.csv on Windows systems |
(optional) | log_level |
string |
The console output verbosity. Options include:
|
INFO |
(optional) | tmp_dir |
string |
The folder to use when dask must spill data to disk. | /tmp |
(optional) | show_graph |
boolean |
Whether or not to create ./graph.png and ./graph.svg showing the data dependency graph. (Requires PyGraphViz to be installed.) |
False |
(optional) | show_stacktrace |
boolean |
Whether to show a stacktrace for runtime errors. | False |
(optional) | macros |
string |
Jinja macros which will be available within any template throughout the project. (This can slow performance.) | (none) |
(optional) | parameter_defaults |
dict |
Default values to be used if the user fails to specify a parameter. | (none) |
(optional) | show_progress |
boolean |
Whether to show a progress bar for each Dask transformation | False |
(optional) | git_auth_timeout |
integer |
Number of seconds to wait for the user to enter Git credentials if needed during package installation (see project composition) | 60 |
definitions
The (optional) definitions
section can be used to define YAML elements which are reused throughout the rest of the configuration. earthmover
does nothing special with this section, it's just interpreted by the YAML parser. However, this can be a very useful way to keep your YAML configuration DRY – rather than redefine the same values, Jinja phrases, etc. throughout your config, define them once in this section and refer to them later using YAML anchors, aliases, and overrides.
An example definitions
section, and how it can be used later on, are shown below:
definitions:
operations:
- &student_join_op
operation: join
join_type: left
left_key: student_id
right_key: student_id
...
date_to_year_jinja: &date_to_year "{%raw%}{{ val[-4:] }}{%endraw%}"
...
transformations:
roster:
operations:
- <<: *student_join_op
sources:
- $sources.roster
- $sources.students
enrollment:
operations:
- <<: *student_join_op
sources:
- $sources.enrollment
- $sources.students
...
academic_terms:
operations:
- operation: duplicate_columns
source: $sources.academic_terms
columns:
start_date: school_year
- operation: modify_columns
columns:
school_year: *date_to_year
packages
The (optional) packages
section can be used to specify packages – other earthmover projects from a local directory or GitHub – to import and build upon exisiting code. See Project Composition for details and considerations.
A sample package
s section is shown here; the options are explained below.
packages:
year_end_assessment:
git: https://github.com/edanalytics/earthmover_edfi_bundles.git
subdirectory: assessments/assessment_name
student_id_macros:
local: path/to/student_id_macros
./packages/
) such as year_end_assessment
or student_id_macros
in the above example. Two sources of packages are currently supported:
-
GitHub packages: Specify the URL of the repository containing the package. If the package YAML configuration is not in the top level of the repository, include the path to the folder with the the optional subdirectory.
Tip
earthmover
uses the system'sgit
client to clone packages from GitHub. To access non-public packages, thegit
client must have authentication configured separately. -
Local packages: Specify the path to the folder containing the package YAML configuration. Paths may be absolute or relative paths to the location of the
earthmover
YAML configuration file.
sources
The sources
section specifies source data earthmover
will transform.
A sample sources
section is shown here; the options are explained below.
sources:
districts:
connection: "ftp://user:pass@host:port/path/to/districts.csv"
tx_schools:
connection: "postgresql://user:pass@host/database"
query: >
select school_id, school_name, school_website
from schema.schools
where school_address_state='TX'
courses:
file: ./data/Courses.csv
header_rows: 1
columns:
- school_id
- school_year
- course_code
- course_title
- subject_id
more_schools:
file: ./data/Schools.csv
header_rows: 1
columns:
- school_id
- school_name
- address
- phone_number
expect:
- low_grade != ''
- high_grade != ''
- low_grade|int <= high_grade|int
Each source must have a name (which is how it is referenced by transformations and destinations) such as districts
, courses
, tx_schools
, or more_schools
in the above example. Currently-supported source types include:
source type | format | file type | notes |
---|---|---|---|
file | row-based | .csv |
Specify the number of header_rows , and (if header_rows > 0, optionally) overwrite the column names. Optionally specify an encoding to use when reading the file (the default is UTF8). |
.tsv |
Specify the number of header_rows , and (if header_rows > 0, optionally) overwrite the column names. Optionally specify an encoding to use when reading the file (the default is UTF8). |
||
.txt |
A fixed-width text file; see the documentation below for configuration details. | ||
column-based | .parquet , .feather , .orc |
These require the pyarrow library, which can be installed with pip install pyarrow or similar |
|
structured | .json |
Optionally specify a object_type (frame or series ) and orientation (see these docs) to interpret different JSON structures. |
|
.jsonl or .ndjson |
Files with a flat JSON structure per line. | ||
.xml |
Optionally specify an xpath to select a set of nodes deeper in the XML. |
||
.html |
Optionally specify a regex to match for selecting one of many tables in the HTML. This can be used to extract tables from a live web page. |
||
Excel | .xls , .xlsx , .xlsm , .xlsb , .odf , .ods and .odt |
Optionally specify the sheet name (as a string) or index (as an integer) to load. |
|
other | .pkl or .pickle |
A pickled Python object (typically a Pandas dataframe) | |
.sas7bdat |
A SAS data file | ||
.sav |
A SPSS data file | ||
.dta |
A Stata data file | ||
Database | various | - | Database sources are supported via SQLAlchemy. They must specify a database connection string and SQL query to run. |
FTP | various | - | FTP file sources are supported via ftplib. They must specify an FTP connection string with the path to the file. |
File type
is inferred from the file extension, however you may manually specify type:
(csv
, tsv
, fixedwidth
, parquet
, feather
, orc
, json
, jsonl
, xml
, html
, excel
, pickle
, sas
, spss
, or stata
) to force earthmover
to treat a file with an arbitrary extension as a certain type. Remote file paths (https://somesite.com/path/to/file.csv
) generally work.
Fixed-width config
Using a fixed-width file (FWF) as a source requires additional metadata, configuring how earthmover
should slice each row into its constituent columns. Two ways to provide this metadata are supported:
Provide a colspec_file
Example configuration for a fixedwidth
source with a colspec_file
:
sources:
input:
file: ./data/input.txt
colspec_file: ./seed/colspecs.csv # required
colspec_headers:
name: field_name # required
start: start_index # required if `width` is not provided
end: end_index # required if `width` is not provided
width: field_length # required if `start` or `end` is not provided
type: fixedwidth # required if `file` does not end with '.txt'
header_rows: 0
Notes:
- (required)
colspec_file
: path to the CSV containingcolspec
details - (required)
colspec_headers
: mapping between thecolspec_file
's column names and fields required byearthmover
. (Columns may have any name and position.)- Only
name
is always required:colspec_file
must contain a column that assigns a name to each field in the FWF - Either
width
or bothstart
andend
are required- If
width
is provided,colspec_file
should include a column of integer values indicating the number of characters in each field in the FWF - If
start
andend
are provided,colspec_file
should include two columns of integer values giving extents of the FWF's fields as half-open intervals (i.e., [from, to[ )
- If
- Only
- (optional)
type
: optional if source file has.txt
extension; otherwise, specifytype: fixedwidth
(there is no standard file extension for FWFs) - (optional)
header_rows
: usually 0 for FWFs;earthmover
attempts to infer if not specified
Formatting a colspec_file
A colspec_file
must include a column with field names, and
- either a column with field widths
- or two columns with start and end positions
Example of (1):
withearthmover.yaml
like:
Example of (2):
start_idx, end_idx, other_data, full_field_name, other_data_2
0, 8, abc, date, def
8, 24, abc, id, def
24, 26, abc, score_1, def
26, 28, abc, score_2, def
earthmover.yaml
like:
Provide colspecs
and columns
Example configuration for a fixedwidth
source with a colspecs
and columns
:
sources:
input:
file: ./data/input.txt
type: fixedwidth # required if `file` does not end with '.txt'
header_rows: 0
colspecs: # required
- [0, 8]
- [8, 24]
- [24, 26]
- [26, 28]
columns: # required
- date
- id
- score_1
- score_2
Notes:
- (required)
colspecs
: list of start and end indices giving extents of the FWF's fields as half-open intervals (i.e., [from, to[ ) - (required)
columns
: list of column names corresponding to the indices incolspecs
source
examples
(for an input file like...)
(for an input file like...)
sources:
mydata:
file: ./data/mydata.txt
colspecs:
- [0, 4] # id
- [6, 9] # year
- [10, 20] # code
- ...
columns:
- id
- year
- code
- ...
# or
colspec_file: ./mydata_colspec.csv
# where `mydata_colspec.csv` is a file with columns `col_name`, `start_pos`, and `end_pos`
(for an input file like...)
(for an input file like...)
(for an input file like...)
(for an input file like...)
expect
ations
For any source, optionally specify conditions you expect
data to meet which, if not true for any row, will cause the run to fail with an error. (This can be useful for detecting and rejecting NULL or missing values before processing the data.) The format must be a Jinja expression that returns a boolean value. This is enables casting values (which are all treated as strings) to numeric formats like int and float for numeric comparisons.
sources:
school_directory_grades_only:
file: ./sources/school_directory.csv
header_rows: 1
expect:
- "low_grade != ''"
- "high_grade != ''"
# ^ require a `low_grade` and a `high_grade`
credentials
The Database and FTP examples above show user:pass
in the connection string, but if you are version-controlling your earthmover
project, you must avoid publishing such credentials. Typically this is done via parameters, which may be referenced throughout earthmover.yml
(not just in the sources
section), and are parsed at run-time. For example:
sources:
mydata:
connection: "postgresql://${DB_USERNAME}:${DB_PASSWORD}@host/mydatabase"
query: >
select id, year, code, ...
from myschema.mytable
where some_column='some_value'
export DB_USERNAME=myuser
export DB_PASSWORD=mypa$$w0rd
earthmover run
# or
earthmover run -p '{"DB_USERNAME":"myuser", "DB_PASSWORD":"mypa$$w0rd"}'
transformations
The transformations
section specifies how source data is transformed by earthmover
.
A sample transformations
section is shown here; the options are explained below.
transformations:
courses:
source: $sources.courses
operations:
- operation: map_values
column: subject_id
mapping:
01: 1 (Mathematics)
02: 2 (Literature)
03: 3 (History)
04: 4 (Language)
05: 5 (Computer and Information Systems)
- operation: join
sources:
- $sources.schools
join_type: inner
left_key: school_id
right_key: school_id
- operation: drop_columns
columns:
- address
- phone_number
The above example shows a transformation of the courses source, which consists of an ordered list of operations. A transformation defines a source to which a series of operations are applied. This source may be an original $source
or another $transformation
. Transformation operations each require further specification depending on their type; the operations are listed and documented below.
Frame operations
union
Concatenates or "stacks" the transformation source with one or more sources of the same shape.
join
Joins the transformation source with one or more sources.
- operation: join
sources:
- $sources.schools
join_type: inner | left | right
left_key: school_id
right_key: school_id
# or:
left_keys:
- school_id
- school_year
right_keys:
- school_id
- school_year
# optionally specify columns to (only) keep from the left and/or right sources:
left_keep_columns:
- left_col_1
- left_col_2
right_keep_columns:
- right_col_1
- right_col_2
# or columns to discard from the left and/or right sources:
left_drop_columns:
- left_col_1
- left_col_2
right_drop_columns:
- right_col_1
- right_col_2
# (if neither ..._keep nor ..._drop are specified, all columns are retained)
Joining can lead to a wide result; the ..._keep_columns
and ..._drop_columns
options enable narrowing it.
Besides the join column(s), if a column my_column
with the same name exists in both tables and is not dropped, it will be renamed my_column_x
and my_column_y
, from the left and right respectively, in the result.
Column operations
Tip: use Jinja!
The add_columns
and modify_columns
operations (documented below) support the use of Jinja templating to add/modify values dynamically. In the column definitions for such operations:
{{value}}
refers to this column's value{{AnotherColumn}}
refers to another column's value- Any Jinja filters and math operations should work
- Reference the current row number with
{{__row_number__}}
- Reference a dictionary containing the row data with
{{__row_data__['column_name']}}
Jinja expressions must be wrapped in {%raw%}...{%endraw%}
to avoid them being parsed by the initial parsing step.
add_columns
Adds columns with specified values.
- operation: add_columns
columns:
new_column_1: value_1
new_column_2: "{%raw%}{% if True %}Jinja works here{% endif %}{%endraw%}"
new_column_3: "{%raw%}Reference values from {{AnotherColumn}} in this new column{%endraw%}"
new_column_4: "{%raw%}{% if col1>col2 %}{{col1|float + col2|float}}{% else %}{{col1|float - col2|float}}{% endif %}{%endraw%}"
rename_columns
Renames columns.
duplicate_columns
Duplicates columns (and all their values).
drop_columns
Removes the specified columns.
keep_columns
Keeps only the specified columns, discards the rest.
combine_columns
Combines the values of the specified columns, delimited by a separator, into a new column.
modify_columns
Modify the values in the specified columns.
map_values
Map the values of a column.
- operation: map_values
column: column_name
# or, to map multiple columns simultaneously
columns:
- col_1
- col_2
mapping:
old_value_1: new_value_1
old_value_2: new_value_2
# or a CSV/TSV with two columns (from, to) and header row
# paths may be absolute or relative paths to the location of the `earthmover` YAML configuration file
map_file: path/to/mapping.csv
date_format
Change the format of a date column.
- operation: date_format
column: date_of_birth
# or
columns:
- date_column_1
- date_column_2
- date_column_3
from_format: "%b %d %Y %H:%M%p"
to_format: "%Y-%m-%d"
ignore_errors: False # Default False
exact_match: False # Default False
The from_format
and to_format
must follow Python's strftime() and strptime() formats.
When ignore_errors
is True
, empty strings will be replaced with Pandas NaT (not-a-time) datatypes. This ensures column-consistency and prevents a mix of empty strings and timestamps.
When exact_match
is True
, the operation will only run successfully if the from_format
input exactly matches the format of the date column. When False
, the operation allows the format to partially-match the target string.
Row operations
distinct_rows
Removes duplicate rows.
Optionally specify the columns
to use for uniqueness, otherwise all columns are used. If duplicate rows are found, only the first is kept.
filter_rows
Filter (include or exclude) rows matching a query.
The query format is anything supported by Pandas.DataFrame.query. Specifying behavior as exclude
wraps the Pandas query()
with not()
.
sort_rows
Sort rows by one or more columns.
By default, rows are sorted ascendingly. Set descending: True
to reverse this order.
limit_rows
Limit the number of rows in the dataframe.
(If fewer than count rows in the dataframe, they will all be returned.)
flatten
Split values in a column and create a copy of the row for each value.
- operation: flatten
flatten_column: my_column
left_wrapper: '["' # characters to trim from the left of values in `flatten_column`
right_wrapper: '"]' # characters to trim from the right of values in `flatten_column`
separator: "," # the string by which to split values in `flatten_column`
value_column: my_value # name of the new column to create with flattened values
trim_whitespace: " \t\r\n\"" # characters to trim from `value_column` _after_ flattening
The defaults above are designed to allow flattening JSON arrays (in a string) with simply
Note that for empty string values or empty arrays, a row will still be preserved. These can be removed in a second step with a filter_rows
operation. Example:
# Given a dataframe like this:
# foo bar to_flatten
# --- --- ----------
# foo1 bar1 "[\"test1\",\"test2\",\"test3\"]"
# foo2 bar2 ""
# foo3 bar3 "[]"
# foo4 bar4 "[\"test4\",\"test5\",\"test6\"]"
#
# a flatten operation like this:
- operation: flatten
flatten_column: to_flatten
value_column: my_value
# will produce a dataframe like this:
# foo bar my_value
# --- --- --------
# foo1 bar1 test1
# foo1 bar1 test2
# foo1 bar1 test3
# foo2 bar2 ""
# foo3 bar3 ""
# foo4 bar4 test4
# foo4 bar4 test5
# foo4 bar4 test6
#
# and you can remove the blank rows if needed with a further operation:
- operation: filter_rows
query: my_value == ''
behavior: exclude
Group operations
group_by
Reduce the number of rows by grouping, and add columns with values calculated over each group.
- operation: group_by
group_by_columns:
- student_id
create_columns:
num_scores: count()
min_score: min(item_score)
max_score: max(item_score)
avg_score: mean(item_score)
item_scores: agg(item_score,;)
Valid aggregation functions are
count()
orsize()
- the number of rows in each groupmin(column)
- the minimum (numeric) value incolumn
for each groupstr_min(column)
- the minimum (string) value incolumn
for each groupmax(column)
- the maximum (numeric) value incolumn
for each groupstr_max(column)
- the maximum (string) value incolumn
for each groupsum(column)
- the sum of (numeric) values incolumn
for each groupmean(column)
oravg(column)
- the mean of (numeric) values incolumn
for each groupstd(column)
- the standard deviation of (numeric) values incolumn
for each groupvar(column)
- the variance of (numeric) values incolumn
for each groupagg(column,separator)
- the values ofcolumn
in each group are concatenated, delimited byseparator
(defaultseparator
is none)json_array_agg(column,[str])
- the values ofcolumn
in each group are concatenated into a JSON array ([1,2,3]
). If the optionalstr
argument is provided, the values in the array are quoted (["1", "2", "3"]
)
Numeric aggregation functions will fail with errors if used on non-numeric column values.
Note the difference between min()
/max()
and str_min()
/str_max()
: given a list like 10, 11, 98, 99, 100, 101
, return values are
function | return |
---|---|
min() |
10 |
str_min() |
10 |
max() |
101 |
str_max() |
99 |
Debug operation
debug
Prints out information about the data at the current transformation operation.
- operation: debug
function: head # or `tail`, `describe`, `columns`; default=`head`
rows: 10 # (optional, default=5; ignored if function=describe|columns)
transpose: True # (default=False; ignored when function=columns)
skip_columns: [a, b, c] # to avoid logging PII
keep_columns: [x, y, z] # to look only at specific columns
function=head|tail
displays therows
first or last rows of the dataframe, respectively. (Note that on large dataframes, these may not truly be the first/last rows, due to Dask's memory optimizations.)function=describe
shows statistics about the values in the dataframe.function=columns
shows the column names in the dataframe.transpose
can be helpful with very wide dataframes.keep_columns
defaults to all columns,skip_columns
defaults to no columns.
destinations
The destinations
section specifies how transformed data is materialized to files.
A sample destinations section is shown here; the options are explained below.
destinations:
schools:
source: $transformations.school_list
template: ./json_templates/school.jsont
extension: jsonl
linearize: True
courses:
source: $transformations.course_list
template: ./json_templates/course.jsont
extension: jsonl
linearize: True
course_report:
source: $transformations.course_list
template: ./json_templates/course.htmlt
extension: html
linearize: False
header: <html><body><h1>Course List:</h1>
footer: </body></html>
For each file you want materialized, provide the source
and (optionally) the Jinja template
file. The materialized file will contain template
rendered for each row of source, with an optional header
prefix and footer
postfix (both of which may contain Jinja, and which may reference __row_data__
which is the first row of the data frame... a formulation such as {%raw%}{% for k in __row_data__.pop('__row_data__').keys() %}{{k}}{% if not loop.last %},{% endif %}{% endfor %}{%endraw%}
may be useful). Files are materialized using your specified extension
(which is required).
If linearize
is True
, all line breaks are removed from the template, resulting in one output line per row. (This is useful for creating JSONL and other linear output formats.) If omitted, linearize
is True
.
Global options
Any source
, transformation
, or destination
node may additionally specify
debug: True
, which outputs the dataframe shape and columns after the node completes processing (this can be helpful for building and debugging)require_rows: True
orrequire_rows: 10
to have earthmover exit with an error if0
(forTrue
) or<10
(for10
) rows are present in the dataframe after the node completes processingshow_progress: True
to display a progress bar while processing this noderepartition: True
to repartition the node in memory before continuing to the next node; set either the number of bytes, or a text representation (e.g., "100MB") to shuffle data into new partitions of that size (Note: this configuration is advanced, and its use may drastically affect performance)
Jinja templates
earthmover
destinations can specify a Jinja template to render for each row of the final dataframe — a text file (JSON, XML, HTML, etc.) containing Jinja with references to the columns of the row.
An example Jinja template for JSON could be
{
"studentUniqueId": "{{student_id}}",
"birthDate": "{{birth_date}}",
"firstName": "{{first_name}}",
"lastSurname": "{{last_name}}"
}
Using a linter on the output of earthmover run
can help catch syntax mistakes when developing Jinja templates.