AWSの機械学習開発用のプラットフォーム、Amazon SageMakerを使った機械学習モデル構築の一連の流れを実施してみたいと思います。
ブログ中でコードと、使用してみた感触についても述べていきます。
Amazon SageMakerとは
まず、Amazon SageMaker(以下、SageMaker)についてです。
SageMakerはAmazon Web Servicesの一つのサービスで、開発者やデータサイエンティスト向けに機械学習の開発とデプロイを一気通貫に行うことができるマネージドサービスです。

SageMakerは機械学習モデルの開発プロセスにおいてバラバラになりがちな環境をひとつのサービス内で提供することを目的に作られています。
SageMaker内でのIDEとしてはメジャーなJupyter notebookを利用できるほか、OSS系のフレームワークとしてScikit LearnをベースにラッピングしたオリジナルのSageMakerライブラリ(機能についてはコードサンプルと一緒に後述します)や、Tensorflow、Keras、ChainerなどメジャーなDeep Learningのフレームワークも標準で利用することができます。
また、一つのサービスでありつつも、実際の開発およびチームの切れ目となる部分はそれぞれ独立して機能できるように構築されています。
下の図をご覧ください。

まず、SageMaker上でのコーディングはノートブックインスタンスという独立したEC2上で行います。
チューニングをし、データを読み込んで学習させる負荷の高い処理は別途トレーニングインスタンスを立てて行うことができます。
トレーニングインスタンスは通常のEC2のようにインスタンスタイプとスケーリングセットを設定することができ、時間のかかりがちな学習プロセスを効率化することができます。トレーニングインスタンスは学習が終わると自動的に削除されるため、無駄にコストがかかることがありません。
トレーニング済みのモデルはSageMaker内ではなく、S3に配置するようになっています。また、トレーニング用のデータが大きい、または外部のシステムから連携されるような場合はそもそもトレーニングもS3から呼ぶことが可能です。
テストデータを投げて推論を返すのは別途推論インスタンスを使用します。推論インスタンスはAPIエンドポイントを公開することができますので、アプリケーションとして実装する際には、例えばデータが更新されたことをトリガにLambda経由でデータを投げ、推論インスタンスから値を返すようなアーキテクチャが考えられます。
また、SageMakerで開発したモデル以外にもBYOMとしてDocker化したモデルをECRにおいて置くことで推論インスタンスからアクセスが可能です。
SageMakerを使ってMachine Learningをやってみよう
さて、実際にSageMaker上でコーディングを行い、推論インスタンスにデータを投げて推論を返すところまでを実装してみましょう。
今回のデータセットにはKaggleの”Restaurant Revenue Prediction”というコンペのものを使います。ざっくりいうと、諸条件からレストランの売上を回帰モデル的に推定します。

使ったアルゴリズム
推論のアルゴリズムにはXGBoostというものを利用します。こちらはSageMakerにプリセットされているためConda installもpipも不要です。
アルゴリズムの中身については下記に委ね、本稿ではオミットします。

開発スタート
では、やっていきましょう。基本的にWebブラウザで完結させます(今回は)。
AWSのコンソールにログインし、->SageMaker->左側のペインからノートブックインスタンスを作成します。ノートブックインスタンスはあまり重くならないので最も軽く安いml.t2.mediumというインスタンスを選択しました。立ち上がりにしばらくかかります。

セットアップが完了すると、上の画像「アクション」のところにJupyterを開く、というリンクが現れるのでクリック。よく見慣れたJupyter notebookのUIにいい感じになります。
Jupyterの操作方法については沢山のレファレンスがあるため、そちらを参照ください。今回は開発用のノートブックとしてRestaurant Revenue Predictionというファイルを作り、このファイルと同じディレクトリにtrain.csvとtest.csvを配置しています。
では、importからです。
import pandas as pd import numpy as np #データを読み込む print('Loading dataset...') train = pd.read_csv("data/train.csv") test = pd.read_csv("data/test.csv") print('Finished')
# 前処理 train['WhatIsData'] = 'Train' test['WhatIsData'] = 'Test' test['revenue'] = 9999999999 alldata = pd.concat([train,test],axis=0).reset_index(drop=True) alldata["Open Date"] = pd.to_datetime(alldata["Open Date"]) alldata["Year"] = alldata["Open Date"].apply(lambda x:x.year) alldata["Month"] = alldata["Open Date"].apply(lambda x:x.month) alldata["Day"] = alldata["Open Date"].apply(lambda x:x.day) alldata["kijun"] = "2015-04-27" alldata["kijun"] = pd.to_datetime(alldata["kijun"]) alldata["BusinessPeriod"] = (alldata["kijun"] - alldata["Open Date"]).apply(lambda x: x.days) alldata = alldata.drop('Open Date', axis=1) alldata = alldata.drop('kijun', axis=1) # 訓練データ特徴量をリスト化 cat_cols = alldata.dtypes[alldata.dtypes=='object'].index.tolist() num_cols = alldata.dtypes[alldata.dtypes!='object'].index.tolist() other_cols = ['Id','WhatIsData'] # 余計な要素をリストから削除 cat_cols.remove('WhatIsData') #学習データ・テストデータ区別フラグ除去 num_cols.remove('Id') #Id削除 # カテゴリカル変数をダミー化 cat = pd.get_dummies(alldata[cat_cols])
読み込んで前処理まで終えました。次がSageMakerならではです。
sagemakerライブラリをインポートし、整形した訓練用データをS3にアップロードします。ちなみにCSVを読み込んださいのPandas Dataframeのままだと扱えないので、訓練データとテストデータにsplitしたあとそれぞれの特徴量と目的変数をarray化しています。
# 前処理されたデータをS3に書き込み import struct import io import boto3 import pickle import gzip import sagemaker from sagemaker import get_execution_role print('Processing Data...') # S3へのアップロード準備 sess = sagemaker.Session() role = get_execution_role() bucket = sess.default_bucket() prefix = 'notebook/xgboost/restaurant' # 書き込み用の関数 def to_libsvm(f_name, labels, values): with open(f_name,'w',encoding = 'utf-8') as f: content = '\n'.join(['{} {}'.format(label, ' '.join(['{}:{}'.format(i + 1, el) for i, el in enumerate(vec)])) for label, vec in zip(labels, values)]) f.write(content) # データ結合 all_data = pd.concat([alldata[other_cols],alldata[num_cols].fillna(0),cat],axis=1) train_set = all_data[all_data['WhatIsData']=='Train'].drop(['WhatIsData','Id'], axis=1).reset_index(drop=True) train_label = train_set["revenue"] train_set = train_set.drop(['revenue'], axis=1) test_set = all_data[all_data['WhatIsData']=='Test'].drop(['WhatIsData','Id'], axis=1).reset_index(drop=True) test_label = test_set["revenue"] test_set = test_set.drop(['revenue'], axis=1) #Dataframeを配列へ変換 train_set_array = train_set.values train_label_array = train_label.values test_set_array = test_set.values test_label_array = test_label.values to_libsvm('data.train', train_label_array, train_set_array) to_libsvm('data.test', test_label_array, test_set_array) # アップロード print('Uploading to S3...') train_input = sess.upload_data( path='data.train', key_prefix=prefix) test_input = sess.upload_data( path='data.test', key_prefix=prefix) print("Done!")
学習までの準備を終えました。次に学習に利用するインスタンスの条件と、学習に使うアルゴリズム、ハイパーパラメータを宣言します。
# 利用するモデル、インスタンスタイプの宣言 import sagemaker from sagemaker.amazon.amazon_estimator import get_image_uri from time import gmtime, strftime from sagemaker import get_execution_role print('Processing Data...') sess = sagemaker.Session() role = get_execution_role() bucket = sess.default_bucket() prefix = 'notebook/xgboost/restaurant' container = get_image_uri('ap-northeast-1', 'xgboost') training_job_name = 'DEMO-xgboost-Regression' + strftime("%Y-%m-%d-%H-%M-%S", gmtime()) xgb = sagemaker.estimator.Estimator(container, role, train_instance_count=1, train_instance_type='ml.m4.4xlarge', output_path='s3://{}/{}/output'.format(bucket, prefix), sagemaker_session=sess) xgb.set_hyperparameters(num_round=25) print('Model Fitting...') xgb.fit({'train': train_input}, job_name = training_job_name) print('Processing Done!')
これで学習済みモデルがS3にアップロードされました。次いで推論インスタンスをセットアップし、先程学習したモデルをデプロイします。
こちらも数十秒〜数分程度かかるかと思います。
# モデルを推論インスタンスにデプロイ xgb_predictor = xgb.deploy(initial_instance_count=1, instance_type='ml.t2.medium')
デプロイが完了したら、インスタンスが受け取るデータを指定します。
# エンドポイントが受け取るcontentを指定 from sagemaker.predictor import csv_serializer xgb_predictor.content_type = 'text/csv' xgb_predictor.serializer = csv_serializer xgb_predictor.deserializer = None # モデルにフィットさせる関数 def mypredict(data): predictions = xgb_predictor.predict(data).decode('utf-8') #推定結果を , 区切りでarrayで返却 return np.fromstring(predictions[1:], sep=',')
これでデプロイ完了です。適当にテストデータをひとつ投げてみます。
# テストデータから1つを取り出す td = test_set_array[1] # 取り出したデータで推論を返す mypredict(td)
推論データとして”61072.5”と返って来ました。今回はお試しのため、精度については不問とします。
推論インスタンスは普通のEC2のように起動し続けると課金の対象となるため、不要になったら削除しておきます。これもコードから行うことができます。
# エンドポイントの削除 xgb_predictor.delete_endpoint()
さいごに:所感
今回はSageMakerを使って開発〜モデルをデプロイして推論を返す一連の流れを実施しました。
最後に一連を実施してみて感じたPros/Consについてつらつらと。
・メリット
コーディング、学習、推論が疎結合になっている
それぞれで別々のインスタンスやスケーリングを指定できる
学習データや構築済みモデルはS3に格納するので誤操作が起きにくい
→データサイエンティストとエンジニアが干渉することなく協業可能
DSはモデルを構築するところまで担当。S3に最新版モデルが格納される。
エンジニアは推論インスタンスとS3を組み込む形でアーキテクチャを検討する。
学習インスタンスは学習が終わると自動で消えるため、無駄がない
モデルをデプロイした推論インスタンスAPIを叩けば値を返すので、
アプリ化の際のアーキテクチャがわかりやすい
Jupyter、Anacondaなどがプリセットされていて環境構築の手間がかからない
Webブラウザで利用できる OSSなフレームワークが揃っている
・デメリット ノートブックインスタンスを立ち上げるのがめんどくさい
Jupyter以外のIDEに慣れているとちょっと不便と思うかも
SageMakerライブラリにセットされているアルゴリズムの数が少ない(LightGBMが欲しかった…)
日本語ベースのドキュメントが他のサービスに比べて少ない
今回は以上です。お読みいただきありがとうございました。
コメント