Assign a unique identifier to an order to link the specific output back to its original order. Youโll be able to do the following:
- Track the orderโs status.
- Retrieve the delivered data.
- Use the data as input for a processing job.
- Find all data related to the processing job (both input and output).
# Run this cell to set your unique order identifierUNIQUE_IDENTIFIER_NAME = "Example_Unique_Identifier_123456"!pip install up42-py --upgrade -q
import up42, pathlib, geojsonfrom up42 import processing_templatesfrom itertools import islicefrom datetime import datetimeRun the cell below to create a credentials.json file in a directory named .up42 in your home folder.
# Defines the credentials file path if it doesn't existcredentials_file_path = pathlib.Path.home().joinpath(".up42/credentials.json")credentials_file_path.parent.mkdir(parents=True, exist_ok=True)credentials_file_path.touch(exist_ok=True)
# Prints the path to the fileprint(f"Credentials file is located at: {credentials_file_path}")- Click the link above to the created file and paste the following code:
JSON {"username": "<your-email-address>","password": "<your-password>"} - Retrieve the email address and password used for logging into the console. Use them as values for username and password.
- Save the
credentials.jsonfile.
up42.authenticate(cfg_file=credentials_file_path)
# Use region if you're logging with the NSG service# up42.authenticate(cfg_file=credentials_file_path, region="sa")Define the type of catalog scene you want to search for.
# Select a collection and a data productcollection_name = "sentinel-2"data_product_name = "sentinel-2-level-2a"
# Specify the maximum cloud cover percentage from 0 to 100max_cloud_coverage_percentage = 10
# Specify start and end dates for the search in the YYYY, M, D formatstart_date = datetime(2023, 6, 1)end_date = datetime(2024, 12, 31)
geometry = { "type": "Polygon", "coordinates": [ [ [13.369713, 52.452327], [13.369713, 52.470760], [13.339159, 52.470760], [13.339159, 52.452327], [13.369713, 52.452327], ] ],}Search for catalog scenes with the defined search parameters.
# Fetch the selected collection and data productcollections = up42.ProductGlossary.get_collections()collection = next((c for c in collections if c.name == collection_name), None)data_product = next( (p for p in collection.data_products if data_product_name in p.name), None)
# Find the host providerhost = next((p for p in collection.providers if p.is_host), None)
# Search for scenesscenes = list( host.search( collections=[collection.name], intersects=geometry, start_date=start_date, end_date=end_date, query={"cloudCoverage": {"LT": max_cloud_coverage_percentage}}, ))
print(f"Found {len(scenes)} scenes matching the search criteria.\n")
for scene in islice(scenes, 0, 5): # Print first 5 results print(f"- Scene ID: {scene.id}") print(f" Geometry: {scene.geometry}") print(f" Acquisition date and time: {scene.datetime}") print(f" Cloud coverage: {scene.cloud_coverage}%") print(f" Resolution: {scene.resolution} m") print(f" Delivery time: {scene.delivery_time}\n")Select a catalog scene to order and estimate its cost.
# Select a scenescene_id = "S2A_T33UUU_20241024T101201_L2A"
aoi = geojson.FeatureCollection(features=[geojson.Feature(geometry=geometry)])
order_template = up42.BatchOrderTemplate( data_product_id=data_product.id, display_name=UNIQUE_IDENTIFIER_NAME, features=aoi, params={"id": scene_id},)
order_template.estimateCreate an order and periodically check its status.
# Create an orderorder_references = order_template.place()order = order_references[0].order
# Print the order object to display its detailsprint(order)
print(f"\nOrder successfully created with the name: '{UNIQUE_IDENTIFIER_NAME}'.")
# Check order status and take actionif order.status == "FULFILLED": print(f"Order '{UNIQUE_IDENTIFIER_NAME}' is fulfilled.")else: print(f"Order '{UNIQUE_IDENTIFIER_NAME}' isn't fulfilled yet. Tracking...") order.track(report_time=150)Once an order is fulfilled, the data is delivered to your UP42 storage. Search your storage for this specific data using the same unique identifier, which is stored in the up42-user:title metadata field.
# Initialize the STAC client to search your UP42 storageUP42_client = up42.stac_client()
# Create a filter to find items where the title contains our unique namefilter = { "op": "a_contains", "args": [ {"property": "up42-user:title"}, UNIQUE_IDENTIFIER_NAME, ],}
# Perform the search and retrieve the first matching itemstac_items_search = UP42_client.search(filter=filter)stac_item = list(stac_items_search.items())[0]print(f"Found delivered item with ID: {stac_item.id}")Once you have your input data, you can run a processing job on it. Validate and estimate your job.
# Create a unique title for the processing job's outputjob_title = f"{UNIQUE_IDENTIFIER_NAME}_processing_output"
# Create the job template using our STAC item as inputjob_template = processing_templates.UpsamplingNSSentinel( title=job_title, item=stac_item)
# Assert that the job is valid, print errors if notif not job_template.is_valid: for error in job_template.errors: print(f"{error}\\n")
print(f"The estimated cost is {job_template.cost.credits} credits.")# Execute the job and track its progressjob = job_template.execute()job.track(wait=20)
print("\nTracking complete!")print(f"The final job status is: {job.status.value}")Find the new item by searching for the unique job_title you assigned it.
# Create a filter to find the output using the unique job titlefilter = { "op": "a_contains", "args": [ {"property": "title"}, job_title, ],}
stac_items_search = UP42_client.search(filter=filter)stac_items_list = list(stac_items_search.items())processed_output_stac_item = stac_items_list[0]Update the metadata of the processed STAC item, adding our original UNIQUE_IDENTIFIER_NAME to its up42-user:title field. This ensures that both the input and output assets share a common identifier.
# Set the "up42-user:title" to link this output back to the original orderprocessed_output_stac_item.up42.title = job_titleprocessed_output_stac_item.update()
print("Updated processed item's metadata to include the unique identifier.\n")# Define output to check the changesprint(f"STAC item ID: {processed_output_stac_item.id}")print(f"User title: {processed_output_stac_item.up42.title}")Perform a single search to retrieve both the original image and its processed output. The following search looks for any STAC item where the up42-user:title contains the root UNIQUE_IDENTIFIER_NAME.
filter = { "op": "a_contains", "args": [ {"property": "up42-user:title"}, UNIQUE_IDENTIFIER_NAME, ],}
stac_items_search = UP42_client.search(filter=filter)
# This list will now include both the processed output and the input STAC itemstac_items_list = list(stac_items_search.items())
print( f"Found {len(stac_items_list)} related items using the unique identifier '{UNIQUE_IDENTIFIER_NAME}':\n")for item in stac_items_list: print(f"- Item ID: {item.id}") print(f" Filename: {item.properties['title']}") print(f" User title: {item.properties['up42-user:title']}\n")