Traveler APIを利用し、シーンを検索して衛星データをダウンロードする(後編)

はじめに

本記事ではJupyterLabを使ってTellus Satellite Data Traveler APIを利用し、衛星データをダウンロードする方法を 前編、後編に分けて紹介します。

前編ではシーンを検索し、シーン情報を取得します。後編ではシーンのファイル情報を取得し、データのダウンロードURLを生成します。今回はその後編です。

ここで紹介しているAPIに使用するデータはTellus Travelerのデータセット詳細画面の「購入設定」>「販売方式」の欄に「Tellusアーカイブ」と記載のあるアーカイブデータセットを使用してください。

APIトークンの発行やrequests モジュールのインストール、検索条件を指定してシーンを検索する、検索結果を元にシーン情報を取得する方法については 前編 を、APIの詳細については こちら をご覧ください。

本記事においてJupyterLabを起動し、サンプルコードを貼り付ける際はTOKEN("TOKENXXXXXXXXXXXXXXXXXX"と表示されている部分)にAPIトークンを貼り付けてください。

事前準備

Traveler APIを使用するためにはAPIトークンが必要です。また、サンプルコードの実行にはrequestsモジュールが必要です。APIトークンの発行方法、モジュールのインストール方法については前編を参照してください。

Pillow、matplotlibライブラリーのインストール

JupyterLab上で画像処理を行うためにPillow、matplotlibのインストールを行います。

Macであればターミナル、Windowsであればコマンドプロンプトで以下のコマンドを実行してください。JupyterLab上でコマンドを実行する場合はコマンドの頭に ! を付けてください。

Pillowインストール

pip install Pillow

JupyterLabでの実行イメージ

traveler_api_p2-01.PNG

Matplotlibインストール

pip install matplotlib

JupyterLabでの実行イメージ

traveler_api_p2-02.PNG

pip listでインストールできているか確認できます。

シーンのファイル情報を取得する

ファイル情報の一覧を取得するためにAPIの​/datasets​/{dataset_id}​/data​/{data_id}​/files​/を使用します。

JupyterLabを起動し以下のサンプルコードを貼り付けます。

関数呼び出し時の引数をファイル情報を取得したいシーン情報に差し替えていただくと、そのシーンでのファイル情報一覧が取得できます。

import requests
TOKEN = "TOKENXXXXXXXXXXXXXXXXXX"
# API 呼び出しの共通設定
BASE_URL = "https://www.tellusxdp.com/api/traveler/v1"
REQUESTS_HEADERS = {
    "Authorization": "Bearer " + TOKEN,
    "Content-Type": "application/json",
}
def get_files_list(dataset_id, data_id):
    url = "{}/datasets/{}/data/{}/files/".format(BASE_URL, dataset_id , data_id)
    res = requests.get(url, headers=REQUESTS_HEADERS)
    try:
        res.raise_for_status()
    except Exception as err:
        raise SystemError("エラー:{}".format(err))
    return res.json()
print(get_files_list("ea71ef6e-9569-49fc-be16-ba98d876fb73","ced28b22-535a-48bb-bb1f-53f04ca4a3ee"))

JupyterLabの画面イメージは以下のようになります。

枠で囲っている部分はご自身のAPIトークンと調べたいシーン情報に置き換えてください。

230823-12.png

dataset_id (データセットID)には、Travelerのデータセット詳細ページのIDを入力し、data_id(シーンID)には取得シーン詳細ページのIDを入力してください。

さらにAPIの/datasets/{dataset_id}/data/{data_id}/files/{file_id}/を使用するとシーンとファイルを指定してファイル情報を取得することができます。

ダウンロードURLを生成する

選択したファイルのダウンロードURLを生成するためにAPIの​/datasets/{ dataset_id }/data/{ data_id }/files/{ file_id }/download-url/を使用します。

JupyterLabに以下のサンプルコードを貼り付けます。

関数呼び出し時の引数を、ダウンロードURLを生成したいファイル情報に書き換えてください。

import requests
TOKEN = "TOKENXXXXXXXXXXXXXXXXXX"
# API 呼び出しの共通設定
BASE_URL = "https://www.tellusxdp.com/api/traveler/v1"
REQUESTS_HEADERS = {
    "Authorization": "Bearer " + TOKEN,
    "Content-Type": "application/json",
}
def get_download_url(dataset_id, data_id, file_id):
    url = "{}/datasets/{}/data/{}/files/{}/download-url/".format(BASE_URL, dataset_id, data_id, file_id)
    res = requests.post(url, headers=REQUESTS_HEADERS)
    try:
        res.raise_for_status()
    except Exception as err:
        raise SystemError("エラー:{}".format(err))
    return res.json()
print(get_download_url("ea71ef6e-9569-49fc-be16-ba98d876fb73","ced28b22-535a-48bb-bb1f-53f04ca4a3ee","9"))

JupyterLabの画面イメージは以下のようになります。

実行結果のURLからファイルをブラウザ上に表示することができます。生成されたダウンロードURLの有効期限は1時間です。

枠で囲っている部分はご自身のAPIトークンと調べたいシーン情報に置き換えてください。

230823-13.png

実行時に403エラーが表示された場合は、該当の衛星データの利用可能な環境を確認してください。

「Tellus環境でのみ利用可能」となっているデータについては、Tellusで提供している 開発環境 からのみAPIを利用できます。

traveler_api_p2-05_1.PNG

なお、ファイルのダウンロードURL生成はTellus Traveler画面でも行うことができます。
表示するにはTellus Travelerのシーン詳細画面の下部に表示されている、ファイル名の右側にある「URLを表示」ボタンをクリックすると表示します。

traveler_api_p2-06_1.PNG

次に生成したダウンロードURLから衛星データをファイル出力します。

以下のサンプルコードを貼り付けて実行してください。

本記事の冒頭でインストールしたpillowとmatplotlibを使用します。

import requests
from PIL import Image
import matplotlib.pyplot as plt
%matplotlib inline
TOKEN = "TOKENXXXXXXXXXXXXXXXXXX"
# API 呼び出しの共通設定
BASE_URL = "https://www.tellusxdp.com/api/traveler/v1"
REQUESTS_HEADERS = {
    "Authorization": "Bearer " + TOKEN,
    "Content-Type": "application/json",
}
#ファイル情報取得API
def get_file_info(dataset_id, data_id, file_id):
    url = "{}/datasets/{}/data/{}/files/{}/".format(BASE_URL, dataset_id, data_id, file_id)
    res = requests.get(url, headers=REQUESTS_HEADERS)
    try:
        res.raise_for_status()
    except Exception as err:
        raise SystemError("エラー:{}".format(err))
    return res.json()
#ダウンロードURL取得API
def get_download_url(dataset_id, data_id, file_id):
    url = "{}/datasets/{}/data/{}/files/{}/download-url/".format(BASE_URL, dataset_id, data_id, file_id)
    res = requests.post(url, headers=REQUESTS_HEADERS)
    try:
        res.raise_for_status()
    except Exception as err:
        raise SystemError("エラー:{}".format(err))
    return res.json()
#ダウンロードしたいデータのファイル情報
dataset_id="ea71ef6e-9569-49fc-be16-ba98d876fb73"
data_id="ced28b22-535a-48bb-bb1f-53f04ca4a3ee"
file_id="4"
#APIを呼び出す
download_url = get_download_url(dataset_id,data_id,file_id)
name = get_file_info(dataset_id,data_id,file_id)
#取得した情報から必要な情報を取り出す
url = download_url["download_url"]
file_name = name["name"]
size_bytes = name["size_bytes"]
print("ファイル名:" + file_name)
print("ファイルサイズ:" + str(size_bytes))
#ファイルを作成して取得したファイル内容を書き出す
response = requests.get(url)
with open(file_name, "wb") as f:
    for chunk in response.iter_content(chunk_size=1024):
        if chunk:
            f.write(chunk)
            f.flush()
#出力したファイル画像を表示する
im1 = Image.open(file_name)
plt.imshow(im1)

JupyterLabの画面イメージは以下のようになります。

枠で囲っている部分はご自身のAPIトークンと出力したいファイル情報に置き換えてください。

230823-14.png

画像ファイルによってはサイズがとても大きい(数百MiB~数GiB)場合があるため注意が必要です。ファイルが大きいと表示に少し時間がかかる場合があります。

取得したファイル名で新しくファイルが作成され、JupyterLab上に画像が表示されます。

230823-15.png

複数ファイルを一括でダウンロードする

これまでに、特定のシーンの1ファイルをダウンロードするスクリプトをご紹介しました。ここからはスクリプトにさらに手を加えて、複数シーンのファイルを一気にダウンロードできる方法を紹介します。

ファイルはシーンに登録されている全ファイルを対象とします。

scenes に対象のデータセットIDとシーンIDを指定して実行してください。

Tellus にはユーザー自身の環境にデータをダウンロードできないデータセットも存在するため、スクリプトでは予めそれらのデータセットは除外しています。

スクリプトを実行した際にダウンロードされる予定のファイルサイズ合計が表示されます。確認して問題がなければ y を入力してダウンロードを開始してください。ダウンロード対象を変更したりダウンロードを中止する場合は、n を入力して処理を終了してください。必要に応じて対象シーン情報を変更して、スクリプトを再実行してください。

なお、ダウンロードされるファイル数が増えるとファイルサイズ合計が大きくなるので、ダウンロード先の空き容量にご注意ください。

import requests
import os
TOKEN = "TOKENXXXXXXXXXXXXXXXXXX"
# API 呼び出しの共通設定
BASE_URL = "https://www.tellusxdp.com/api/traveler/v1"
REQUESTS_HEADERS = {
    "Authorization": "Bearer " + TOKEN,
    "Content-Type": "application/json",
}
### scenes 任意のIDを指定 ###
scenes = [
    {"dataset_id": "000eb404-1f69-4735-a966-2f3115269ee3", "id": "b3ec6d59-b709-4585-99a2-287587402d08"}, # GCOM-C/SST 8日平均 (高次プロダクト) GC1SG1_20230626A08D_D0000_3MSG_SST_F_3000
    {"dataset_id": "182d2ec2-e296-4e18-9c1a-5f769416f23d", "id": "b0470d6f-e4b1-4ef5-af3d-6984c02d04a0"}, # GCOM-C/SST 準リアルタイム(高次プロダクト) GC1SG1_202307190203A05812_L2SL_SSTDQ_3001_SST
]
def filter_downloadable_scenes(scenes):
    url = "{}/datasets/".format(BASE_URL)
    params = {
        "is_order_required": False
    }
    res = requests.get(url, headers=REQUESTS_HEADERS, params=params)
    try:
        res.raise_for_status()
    except Exception as err:
        raise SystemError("エラー:{}".format(err))
    datasets = list(filter(lambda x: x["permission"]["allow_network_type"] == "global", res.json()["results"]))
    return list(filter(lambda x: x["dataset_id"] in [d.get("id") for d in datasets], scenes))
def _fetch_scene_files(dataset_id, data_id):
    url = "{}/datasets/{}/data/{}/files/".format(BASE_URL, dataset_id, data_id)
    res = requests.get(url, headers=REQUESTS_HEADERS)
    try:
        res.raise_for_status()
    except Exception as err:
        log_prefix = "データセット:{} シーン:{} ".format(dataset_id, data_id)
        raise SystemError(log_prefix + "エラー:{}".format(err))
    return res.json()["results"]
def fetch_downloadable_files(scenes):
    result = []
    for v in scenes:
        files = filter(
            lambda x: x["is_downloadable"] == True and x["require_archived_file_download"] == False,
            _fetch_scene_files(v["dataset_id"], v["id"]),
        )
        for y in files:
            merged = {**v, **{"fileinfo": y}}
            result.append(merged)
    return result
def confirm(message, default="n"):
    if default not in ["n", "y"]:
        default = "n"
    selector = "[y/N]" if default == "n" else "[Y/n]"
    while True:
        answer = input(message + selector + " : ")
        if answer in ["n", "no"] or (default == "n" and answer == ""):
            return False
        elif answer in ["y", "ye", "yes"] or (default == "y" and answer == ""):
            return True
def _get_download_url(dataset_id, data_id, file_id):
    log_prefix = "データセット:{} シーン:{} ファイル:{} ".format(dataset_id, data_id, file_id)
    url = "{}/datasets/{}/data/{}/files/{}/download-url/".format(BASE_URL, dataset_id, data_id, file_id)
    res = requests.post(url, headers=REQUESTS_HEADERS)
    try:
        res.raise_for_status()
    except Exception as err:
        raise SystemError(log_prefix + "エラー:{}".format(err))
    return res.json()
def download_data_file(dataset_id, data_id, fileinfo):
    path = "./dataset-{}/data-{}/".format(dataset_id, data_id)
    os.makedirs(path, exist_ok=True)
    log_prefix = "データセット:{} シーン:{} ファイル:{} ".format(dataset_id, data_id, fileinfo["name"])
    print(log_prefix + "ファイルダウンロードを開始")
    url = _get_download_url(dataset_id, data_id, fileinfo["id"])["download_url"]
    res = requests.get(url, stream=True)
    try:
        res.raise_for_status()
    except Exception as err:
        raise SystemError(log_prefix + "エラー:{}".format(err))
    path = "./dataset-{}/data-{}/{}".format(
        dataset_id, data_id, fileinfo["name"],
    )
    with open(path, "wb") as f:
        for chunk in res.iter_content(chunk_size=10240):
            f.write(chunk)
            f.flush()
    print(log_prefix + "ファイルダウンロードを終了")
downloadable_scenes = filter_downloadable_scenes(scenes)
files = fetch_downloadable_files(downloadable_scenes)
file_num = len(files)
print("ダウンロード予定ファイル数 : " + str(file_num))
print("ダウンロード予定ファイルサイズ合計 : " + "{:,}".format(sum([d.get("fileinfo")["size_bytes"] for d in files])) + "Bytes")
if confirm("ダウンロードを開始しますか?"):
    print("ファイルダウンロードを開始")    
    for i, v in enumerate(files):
        print("[{}/{}]".format(i + 1, file_num))
        download_data_file(v["dataset_id"], v["id"], v["fileinfo"])
    print("-----スクリプトを終了-----")

230823-10.png

データセット、シーンごとにフォルダが作られ、その中にファイルが保存されます。

前編でシーンを検索するスクリプトをご紹介しましたが、その結果シーンを対象としてファイルをダウンロードすることもできます。前編のスクリプトと合体し、scenes に検索結果のシーンを代入します。

前編で行ったのと同様に、保存した検索条件 json ファイルをスクリプトファイルと同じフォルダに保存し、`%%JSON_FILE_NAME%%` を json ファイル名に書き換えて実行してください。

以下にサンプルコードをご紹介します。

import json
import requests
import os
TOKEN = "TOKENXXXXXXXXXXXXXXXXXX"
# API 呼び出しの共通設定
BASE_URL = "https://www.tellusxdp.com/api/traveler/v1"
REQUESTS_HEADERS = {
    "Authorization": "Bearer " + TOKEN,
    "Content-Type": "application/json",
}
def search_scene_by_json(filename):
    condition = json.load(open(filename))    
    url = "{}/data-search/".format(BASE_URL)
    res = requests.post(url, headers=REQUESTS_HEADERS, json=condition)
    try:
        res.raise_for_status()
    except Exception as err:
        raise SystemError("エラー:{}".format(err))
    return res.json()["features"]
def filter_downloadable_scenes(scenes):
    url = "{}/datasets/".format(BASE_URL)
    params = {
        "is_order_required": False
    }
    res = requests.get(url, headers=REQUESTS_HEADERS, params=params)
    try:
        res.raise_for_status()
    except Exception as err:
        raise SystemError("エラー:{}".format(err))
    datasets = list(filter(lambda x: x["permission"]["allow_network_type"] == "global", res.json()["results"]))
    return list(filter(lambda x: x["dataset_id"] in [d.get("id") for d in datasets], scenes))
def _fetch_scene_files(dataset_id, data_id):
    url = "{}/datasets/{}/data/{}/files/".format(BASE_URL, dataset_id, data_id)
    res = requests.get(url, headers=REQUESTS_HEADERS)
    try:
        res.raise_for_status()
    except Exception as err:
        log_prefix = "データセット:{} シーン:{} ".format(dataset_id, data_id)
        raise SystemError(log_prefix + "エラー:{}".format(err))
    return res.json()["results"]
def fetch_downloadable_files(scenes):
    result = []
    for v in scenes:
        files = filter(
            lambda x: x["is_downloadable"] == True and x["require_archived_file_download"] == False,
            _fetch_scene_files(v["dataset_id"], v["id"]),
        )
        for y in files:
            merged = {**v, **{"fileinfo": y}}
            result.append(merged)
    return result
def confirm(message, default="n"):
    if default not in ["n", "y"]:
        default = "n"
    selector = "[y/N]" if default == "n" else "[Y/n]"
    while True:
        answer = input(message + selector + " : ")
        if answer in ["n", "no"] or (default == "n" and answer == ""):
            return False
        elif answer in ["y", "ye", "yes"] or (default == "y" and answer == ""):
            return True
def _get_download_url(dataset_id, data_id, file_id):
    log_prefix = "データセット:{} シーン:{} ファイル:{} ".format(dataset_id, data_id, file_id)
    url = "{}/datasets/{}/data/{}/files/{}/download-url/".format(BASE_URL, dataset_id, data_id, file_id)
    res = requests.post(url, headers=REQUESTS_HEADERS)
    try:
        res.raise_for_status()
    except Exception as err:
        raise SystemError("エラー:{}".format(err))
    return res.json()
def download_data_file(dataset_id, data_id, fileinfo):
    path = "./dataset-{}/data-{}/".format(dataset_id, data_id)
    os.makedirs(path, exist_ok=True)
    log_prefix = "データセット:{} シーン:{} ファイル:{} ".format(dataset_id, data_id, fileinfo["name"])
    print(log_prefix + "ファイルダウンロードを開始")
    url = _get_download_url(dataset_id, data_id, fileinfo["id"])["download_url"]
    res = requests.get(url, stream=True)
    try:
        res.raise_for_status()
    except Exception as err:
        raise SystemError("エラー:{}".format(err))
    path = "./dataset-{}/data-{}/{}".format(dataset_id, data_id, fileinfo["name"])
    with open(path, "wb") as f:
        for chunk in res.iter_content(chunk_size=10240):
            f.write(chunk)
            f.flush()
    print(log_prefix + "ファイルダウンロードを終了")
# scenes = scene_search_by_json("sample.json")
scenes = search_scene_by_json("%%JSON_FILE_NAME%%")
print(scenes)
downloadable_scenes = filter_downloadable_scenes(scenes)
files = fetch_downloadable_files(downloadable_scenes)
file_num = len(files)
print("ダウンロード予定ファイル数 : " + str(file_num))
print("ダウンロード予定ファイルサイズ合計 : " + "{:,}".format(sum([d.get("fileinfo")["size_bytes"] for d in files])) + "Bytes")    
if confirm("ダウンロードを開始しますか?"):
    print("ファイルダウンロードを開始")
    for i, v in enumerate(files):
        print("[{}/{}]".format(i + 1, file_num))
        download_data_file(v["dataset_id"], v["id"], v["fileinfo"])
    print("-----スクリプトを終了-----")

まとめ

以上、Traveler APIを利用し、シーンを検索して衛星データをダウンロードする方法を紹介しました。

他にもいろいろなTraveler APIがありますので実際に触ってみて下さい。