The Round

合同会社ナイツオの開発ブログ

静岡でGo言語やりたい人!!→こちら

GAE/Go TaskQueue その1~Pushキュー~

どうもこんにちわ!マツウラです。

今回はデータストアを取り上げたことに続いてタスクキューについてです。
Go言語版のタスクキュードキュメントを参考に見てゆこうと思います。

参考:Go — Google Developers Using Push Queues in Go

Pushキューの使い方 in Go

アプリケーションによって実行される各タスクは、Task typeオブジェクトです。
タスクオブジェクトはタスクのリクエストハンドラのURL、さらにタスクをパラメータ化するデータ・ペイロードオプションが含まれています。

PushキューはAppEngine環境でのみ使用可能です。
AppEngine外部からタスクにアクセスするには、Pullキューを使用して下さい。

Goアプリのキュー設定にはqueue.yamlを使用します(Go Task Queue Configurationを御覧ください)。

タスクをエンキュー(キューにタスクを格納)するには、taskqueue.Add関数を呼び出します。

次の例では、データストアのカウンターを増加するタスクハンドラ(/workder)を定義しています。
ユーザーがアクセス可能なリクエストハンドラを定義し、GETリクエストで現在のカウンタ値を表示、POSTリクエストでタスクをエンキューしています。

タスクキューはタスクを複数回実行可能なので、タスクが実行されるたびにカウンタが増加する本サンプルでは正確な値は取れないので注意して下さい。

const handlerHTML = `
{{range .}}
<p>{{.Name}}: {{.Count}}</p>
{{end}}
<p>Start a new counter:</p>
<form action="/counter" method="POST">
  <input type="text" name="name">
  <input type="submit" value="Add">
</form>
`

var handlerTemplate = template.Must(template.New("handler").Parse(handlerHTML))

type Counter struct {
    Name  string
    Count int
}

func init() {
    http.HandleFunc("/counter", CounterHandler)
    http.HandleFunc("/worker", Worker)
}

func CounterHandler(w http.ResponseWriter, r *http.Request) {
    c := appengine.NewContext(r)
    name := r.FormValue("name")

    if name != "" {
        t := taskqueue.NewPOSTTask("/worker", map[string][]string{"name": {name}})
        if _, err := taskqueue.Add(c, t, ""); err != nil {
            http.Error(w, err.Error(), http.StatusInternalServerError)
            return
        }
    }

    q := datastore.NewQuery("Counter")
    var counters []Counter
    if _, err := q.GetAll(c, &counters); err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }

    err := handlerTemplate.Execute(w, counters); err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
    // OK
}

func Worker(w http.ResponseWriter, r *http.Request) {
    c := appengine.NewContext(r)
    name := r.FormValue("name")
    key := datastore.NewKey(c, "Counter", name, 0, nil)
    var counter Counter
    if err := datastore.Get(c, key, &counter); err == datastore.ErrNoSuchEntity {
        counter.Name = name
    } else if err != nil {
        c.Errorf("%v", err)
        return
    }
    counter.Count++
    if _, err := datastore.Put(c, key, &counter); err != nil {
        c.Errorf("%v", err)
        return
    }
    // OK
}

フォームに名前を入れて送信するたびに、カウンタが増加します。

Pushタスクの実行

ハンドラはURLのパス部分で決定されますが、これはTask構造体のPathフィールドを用いて指定されています。
また、ハンドラが実行されるモジュールおよびバージョンは次に従い決定します。

  • Task構造体を作成した際に設定した"Host"ヘッダー
  • queue.yamltargetディレクティブ

どちらかの条件を指定しない場合、タスクは次のルールに従うことを条件に、エンキューされた際と同じモジュール/バージョンで実行されます。

  • アプリケーションのデフォルトバージョンがタスクをエンキューすると、デフォルトバージョンでタスクが実行されます。
    アプリケーションがタスクをエンキューし、実際にタスクが実行される前にデフォルトのバージョンが変更されると、タスクは新しいデフォルトバージョンで実行されるので注意して下さい。
  • デフォルトバージョン以外でタスクをエンキューすると、タスクは常に同じバージョンで実行されます。

Note: dispatchファイルと一緒にmodulesを使用している場合は、タスクのURLを横取りして別のモジュールに再ルーティングされます。

タスクはキューに追加された際に実行される名前空間が決定されます。
デフォルトでは、タスクは作成されたプロセスの現在の名前空間で実行されます。
キューにタスクを追加する前に名前空間を設定することで、この動作をオーバーライド出来ます。

タスクは10分以内に200-299のHTTPレスポンスを送る必要があります。
もしタスクが失敗したら、AppEngineは設定に基づき再試行します。

Note: 10分以上掛かるタスクを処理する場合はModulesを使用して下さい。

タスクのリクエストヘッダ

タスクには次のヘッダーがAppEngineにより内部的に設定されています。

  • X-AppEngine-QueueName
    (キューの名前。)
  • X-AppEngine-TaskName
    (タスクの名前か、生成された一意のID)
  • X-AppEngine-TaskRetryCount
    (タスクのリトライ回数です。回数には利用可能インスタンスがなく、実行に至らなかった場合も含まれます。)
  • X-AppEngine-TaskExecutionCount
    (タスクが実行中に失敗した回数です。利用可能インスタンスがなく失敗した回数は含まれません。)
  • X-AppEngine-TaskETA
    (タスクの目標実行時間です。1970年1月1日からミリ秒で指定します。)

仮にユーザーがこれらのヘッダーを含んだリクエストを行ってもAppEngineはこれらのヘッダーを削除します。
例外は、ログインした管理者がテスト目的でヘッダーを設定する場合だけです。

また、AppEngineはIPアドレス0.1.0.2からタスクキューリクエストを行います。

タスクの実行順序

タスクの実行順序はいくつかの要因に依存します。

  • キュー内のタスク位置:
    タスクはFIFO(先入れ、先出し)に基づき処理されます。
    ようするに、先に入れたタスクから処理されます。

  • キュー内に残るタスク:
    キューが大量の未処理タスクを持っていると、システムは新しいタスクをキューの先頭にジャンプするようにスケジューリングします。

  • Task構造体のETAフィールド値:
    タスクが実行可能な開始時刻を指定します。
    AppEngineはPushタスクの処理開始を、指定したETAが過ぎるまで待ちます。

  • Task構造体のDelayフィールド値: タスク実行前に最小で何秒待機するか指定します。
    ETAとDelayフィールドはどちらか一方しか指定できないので注意してください。

タスクのリトライ

Pushタスクのリクエストハンドラが200-299のHTTPステータスコードを返した場合、AppEngineはタスクが正常に完了したとみなします。
この範囲外のステータスコードを返された場合、AppEngineはタスクを成功するまでリトライします。
大量のリクエストで溢れてしまうことを避けるため徐々にリトライの間隔は開きますが、リトライは最大で一時間に一回繰り返すようにスケジュールが組まれます。

スキームを設定することでタスクのリトライを設定することができます(queue.yamlのretry_parametersディレクティブ)。

タスクで処理するコードを実装する際には、タスクの結果が冪等性であるか検討することが重要です。
タスクキューは一度だけタスクを実行するよう設定されています。
しかし、システム障害時など例外的な状況では複数回実行されることもあります。
そのため、タスクが繰り返し実行されても問題がないか確認する必要があります。

エンドポイント

PushタスクはURLを介して実装を参照します。
例えば、タスクは/app_worker/fetch_feedのワーカーURLを使用してRSSフィードの取得と解析を行います。
指定のワーカーURLまたはデフォルトを使用することができます。
一般的にアプリケーション内であればタスクは任意のワーカーURLを使用することができます。
また全てのワーカーURLは相対URLとして指定する必要があります。

t := &taskqueue.Task{Path: "/path/to/my/worker"}
t := &taskqueue.Task{Path: "/path?a=b&c=d", Method: "GET"}

ワーカーURLを指定しない場合は、デフォルトワーカーURLが使用されます。

/_ah/queue/queue_name

タスクが独自のワーカーURLを持つ場合、その他のワーカーURLから呼び出されることはありません。
一度キューに追加されると、URLエンドポイントを変更することはできません。

Warning:
キューのデフォルトURLに対応するハンドラが未定義でも、タスクがワーカーURLを持たない場合キューのデフォルトURLが呼ばれます。
この結果、ログに試行したURLと一緒に404が記録されます。
404ではタスクは保存され、最終的に成功するまでリトライされます。
このような正常に完了しないタスクは、管理者コンソールを使用してクリア(またはpurge)することができます。

Purgeの注意点

キューがパージされたことをシステムが検出するのに約20秒かかります。
Pushキューではこの間もタスクを実行し続けます。

そのため、taskqueue.Purge呼び出しと同時に新しいタスクを作成しないでください。
パージの呼び出しから短時間で作成されたタスクも削除されてしまいます。

タスクのセキュアURL

管理者アカウントへのアクセスを制限することで、タスクのURLへユーザーがアクセスを禁じます。
タスクキューは管理者専用URLにアクセスすることも可能です。
これはapp.yamlのハンドラ設定で、login:adminを追加することで行えます。

Note: タスクキューはlogin:adminが指定されたURLパスを使えますが、login:requiredが指定されていると使用できません。

Pushキューと開発サーバー

アプリケーションが開発サーバーで実行されている際、タスクはプロダクション環境のように適切な時間に自動的に実行されます。

開発サーバーでタスクの実行を無効にするには、次のコマンドを実行します。

dev_appserver.py --disable_task_runnning

開発サーバーとプロダクションサーバーでは動作が異なります。
具体的には次のとおりです。

  • 開発サーバーは<rate>, <bucket-size>の属性を考慮しません。
    結果、タスクは可能な限りETAに近い時間に実行されます。
    rateを0に設定すると自動実行されてからタスクを止めることはできません。
  • 開発サーバーはタスクをリトライしません。
  • 開発サーバーは再起動するとキューの状態を破棄します。

半ばタスクキューの仕様説明みたいでした。

公式ドキュメントで少々気になったのが、Goのテンプレートでrepeatが使われていたことです。
コンパイル時にrepeat関数は定義されてないよっとエラーが出るのでコードが古い?のかもしれません。

次回はPullキューについてです。