Track outputs related to a catalog order

Assign a unique identifier to an order to link the specific output back to its original order.

Open notebook

Assign a unique identifier to an order to link the specific output back to its original order. Youโ€™ll be able to do the following:

  • Track the orderโ€™s status.
  • Retrieve the delivered data.
  • Use the data as input for a processing job.
  • Find all data related to the processing job (both input and output).
Python
# Run this cell to set your unique order identifier
UNIQUE_IDENTIFIER_NAME = "Example_Unique_Identifier_123456"

Set up the notebook

1. Install dependencies

Python
!pip install up42-py --upgrade -q
import up42, pathlib, geojson
from up42 import processing_templates
from itertools import islice
from datetime import datetime

2. Configure credentials

Run the cell below to create a credentials.json file in a directory named .up42 in your home folder.

Python
# Defines the credentials file path if it doesn't exist
credentials_file_path = pathlib.Path.home().joinpath(".up42/credentials.json")
credentials_file_path.parent.mkdir(parents=True, exist_ok=True)
credentials_file_path.touch(exist_ok=True)
# Prints the path to the file
print(f"Credentials file is located at: {credentials_file_path}")
  1. Click the link above to the created file and paste the following code:
    JSON
    {
    "username": "<your-email-address>",
    "password": "<your-password>"
    }
  2. Retrieve the email address and password used for logging into the console. Use them as values for username and password.
  3. Save the credentials.json file.

3. Authenticate

Python
up42.authenticate(cfg_file=credentials_file_path)
# Use region if you're logging with the NSG service
# up42.authenticate(cfg_file=credentials_file_path, region="sa")

Step 1. Find and order catalog data

1. Define search parameters

Define the type of catalog scene you want to search for.

Python
# Select a collection and a data product
collection_name = "sentinel-2"
data_product_name = "sentinel-2-level-2a"
# Specify the maximum cloud cover percentage from 0 to 100
max_cloud_coverage_percentage = 10
# Specify start and end dates for the search in the YYYY, M, D format
start_date = datetime(2023, 6, 1)
end_date = datetime(2024, 12, 31)
geometry = {
"type": "Polygon",
"coordinates": [
[
[13.369713, 52.452327],
[13.369713, 52.470760],
[13.339159, 52.470760],
[13.339159, 52.452327],
[13.369713, 52.452327],
]
],
}

2. Search for scenes

Search for catalog scenes with the defined search parameters.

Python
# Fetch the selected collection and data product
collections = up42.ProductGlossary.get_collections()
collection = next((c for c in collections if c.name == collection_name), None)
data_product = next(
(p for p in collection.data_products if data_product_name in p.name), None
)
# Find the host provider
host = next((p for p in collection.providers if p.is_host), None)
# Search for scenes
scenes = list(
host.search(
collections=[collection.name],
intersects=geometry,
start_date=start_date,
end_date=end_date,
query={"cloudCoverage": {"LT": max_cloud_coverage_percentage}},
)
)
print(f"Found {len(scenes)} scenes matching the search criteria.\n")
for scene in islice(scenes, 0, 5): # Print first 5 results
print(f"- Scene ID: {scene.id}")
print(f" Geometry: {scene.geometry}")
print(f" Acquisition date and time: {scene.datetime}")
print(f" Cloud coverage: {scene.cloud_coverage}%")
print(f" Resolution: {scene.resolution} m")
print(f" Delivery time: {scene.delivery_time}\n")

3. Select a scene and estimate an order

Select a catalog scene to order and estimate its cost.

Python
# Select a scene
scene_id = "S2A_T33UUU_20241024T101201_L2A"
aoi = geojson.FeatureCollection(features=[geojson.Feature(geometry=geometry)])
order_template = up42.BatchOrderTemplate(
data_product_id=data_product.id,
display_name=UNIQUE_IDENTIFIER_NAME,
features=aoi,
params={"id": scene_id},
)
order_template.estimate

4. Create and track the order

Create an order and periodically check its status.

Python
# Create an order
order_references = order_template.place()
order = order_references[0].order
# Print the order object to display its details
print(order)
print(f"\nOrder successfully created with the name: '{UNIQUE_IDENTIFIER_NAME}'.")
# Check order status and take action
if order.status == "FULFILLED":
print(f"Order '{UNIQUE_IDENTIFIER_NAME}' is fulfilled.")
else:
print(f"Order '{UNIQUE_IDENTIFIER_NAME}' isn't fulfilled yet. Tracking...")
order.track(report_time=150)

5. Find the delivered data

Once an order is fulfilled, the data is delivered to your UP42 storage. Search your storage for this specific data using the same unique identifier, which is stored in the up42-user:title metadata field.

Python
# Initialize the STAC client to search your UP42 storage
UP42_client = up42.stac_client()
# Create a filter to find items where the title contains our unique name
filter = {
"op": "a_contains",
"args": [
{"property": "up42-user:title"},
UNIQUE_IDENTIFIER_NAME,
],
}
# Perform the search and retrieve the first matching item
stac_items_search = UP42_client.search(filter=filter)
stac_item = list(stac_items_search.items())[0]
print(f"Found delivered item with ID: {stac_item.id}")

Step 2. Process the data

1. Validate and estimate a processing job

Once you have your input data, you can run a processing job on it. Validate and estimate your job.

Python
# Create a unique title for the processing job's output
job_title = f"{UNIQUE_IDENTIFIER_NAME}_processing_output"
# Create the job template using our STAC item as input
job_template = processing_templates.UpsamplingNSSentinel(
title=job_title, item=stac_item
)
# Assert that the job is valid, print errors if not
if not job_template.is_valid:
for error in job_template.errors:
print(f"{error}\\n")
print(f"The estimated cost is {job_template.cost.credits} credits.")

2. Run a processing job

Python
# Execute the job and track its progress
job = job_template.execute()
job.track(wait=20)
print("\nTracking complete!")
print(f"The final job status is: {job.status.value}")

3. Retrieve the processing output

Find the new item by searching for the unique job_title you assigned it.

Python
# Create a filter to find the output using the unique job title
filter = {
"op": "a_contains",
"args": [
{"property": "title"},
job_title,
],
}
stac_items_search = UP42_client.search(filter=filter)
stac_items_list = list(stac_items_search.items())
processed_output_stac_item = stac_items_list[0]

Update the metadata of the processed STAC item, adding our original UNIQUE_IDENTIFIER_NAME to its up42-user:title field. This ensures that both the input and output assets share a common identifier.

Python
# Set the "up42-user:title" to link this output back to the original order
processed_output_stac_item.up42.title = job_title
processed_output_stac_item.update()
print("Updated processed item's metadata to include the unique identifier.\n")
# Define output to check the changes
print(f"STAC item ID: {processed_output_stac_item.id}")
print(f"User title: {processed_output_stac_item.up42.title}")

Perform a single search to retrieve both the original image and its processed output. The following search looks for any STAC item where the up42-user:title contains the root UNIQUE_IDENTIFIER_NAME.

Python
filter = {
"op": "a_contains",
"args": [
{"property": "up42-user:title"},
UNIQUE_IDENTIFIER_NAME,
],
}
stac_items_search = UP42_client.search(filter=filter)
# This list will now include both the processed output and the input STAC item
stac_items_list = list(stac_items_search.items())
print(
f"Found {len(stac_items_list)} related items using the unique identifier '{UNIQUE_IDENTIFIER_NAME}':\n"
)
for item in stac_items_list:
print(f"- Item ID: {item.id}")
print(f" Filename: {item.properties['title']}")
print(f" User title: {item.properties['up42-user:title']}\n")