blog

DeNAのエンジニアが考えていることや、担当しているサービスについて情報発信しています

2025.03.25 技術記事

Kotlin + Jupyter Notebookによるインフラデータ分析

by toliner

#情シス #sre #aws #kotlin #jupyter

はじめに

こんにちは。IT本部 IT戦略部 システム基盤グループのtoliner(トリナー)です。システム基盤グループでは、社内サービスのインフラ運用を担当し、安定稼働やコスト削減に取り組んでいます。
今回は、コストコントロール施策の分析タスクで、Kotlinとkotlin-jupyter (Kotlin Kernel for Jupyter notebooks) を活用した事例を紹介します。
なお、この記事のファイル名や数値等は一部加工しています。ご了承ください。

この記事で扱う内容

  • kotlin-jupyterの利用例
  • 実際に使ってみた感想
  • 利用時の知見

この記事で扱わない内容

  • kotlin-jupyterの環境構築方法
  • Jupyter Notebookや各種ライブラリの使い方
  • Kotlinの基本構文

kotlin-jupyterとは

KotlinはJetBrainsが開発したマルチパラダイム・マルチプラットフォームなプログラミング言語です。弊社では主にAndroidアプリや一部プロダクトのサーバーサイドで利用しています。
Jupyter Notebookは、ブラウザ上でコード実行やテキスト、画像を組み合わせたドキュメントを作成・共有できるインタラクティブな計算ノートブックです。データ分析やAI関連で広く利用されています。
Jupyter Notebookの計算部分は「Kernel」として独立しており、Kernelを追加・変更することで様々なプログラミング言語に対応可能です。
kotlin-jupyterはKotlin用のKernel実装で、Jupyter Notebook上でKotlinコードをPythonと同じ感覚で実行できます。

利用例その1 : S3バケットのコスト分析・可視化

システム基盤グループでは、日々多種多様なログをS3に格納しています。その中で、クラウド移行初期に作成され放置されたバケットがあり、コスト的に不適切な状態でした。
このバケットのコスト最適化を目的に、内容物を解析し、削減施策を検討しました。

前処理

データ分析には、まず必要なデータを取得する必要があります。今回は、バケット内のファイル作成日とサイズの一覧を以下のコマンドで抽出しました。

aws s3 ls バケット名 --recursive | grep -v '/$' > s3-ls.txt

すると、以下のような出力が得られます。

$ head -n 4 s3-ls.txt
2023-11-16 06:21:10     132907 LOGFILE_A_1.csv
2023-11-21 06:19:56    5464860 LOGFILE_B_1.log.gz
2023-11-22 06:20:57      13739 LOGFILE_A_2.csv
2023-11-22 06:20:57     988998 LOGFILE_B_2.log.gz

$ wc --lines s3-ls.txt
2967237 s3-ls.txt

AWS CLIは結果を整形して出力してくれるので、このままだと機械的には少し扱いにくいです。そこで、追加の前処理をして扱いやすいように加工します。

import java.nio.file.Files
import java.nio.file.Paths
import java.nio.file.StandardOpenOption
import java.util.regex.Pattern

Files.newBufferedReader(Paths.get("s3-ls.txt")).use { reader ->
    Files.newBufferedWriter(Paths.get("s3-ls.csv"), StandardOpenOption.CREATE).use { writer ->
        writer.appendLine("作成日,作成時間,ファイルサイズ,ファイル名")
        reader.lineSequence().map {
            it.split(Pattern.compile("\\s+")).joinToString(separator = ",")
        }.forEach {
            writer.appendLine(it)
        }
    }
}

処理としては非常に単純で、以下の処理を全行に対して実施しています。

  1. 1行読む
  2. 正規表現を用いて"1文字以上の空白"で文字列を分割する
  3. 分割されたものを",“で繋げる
  4. 繋げたものを書きだす

性能については改善の余地がありますが、処理対象が高々300万行程度かつ1度しかやらない処理なため、手短に実装することを優先しました。 なお、この処理もJupyter Notebook上で実行しています。 s3-preprocess これで、以下のようなCSVができます。

$ head -n 4 s3-ls.csv
2023-11-16,06:21:10,132907,LOGFILE_A_1.csv
2023-11-21,06:19:56,5464860,LOGFILE_B_1.log.gz
2023-11-22,06:20:57,13739,LOGFILE_A_2.csv
2023-11-22,06:20:57,988998,LOGFILE_B_2.log.gz

データ分析・可視化

CSVの読み込み・加工には Dataframe ライブラリを、データの描画にはlets-plot-kotlinを利用します。 以下の記述でDataframeを用いたCSV読み込みができます。

%useLatestDescriptors
%use dataframe, lets-plot
val df = DataFrame.read("s3-ls.csv", delimiter = ',')

これで準備が完了しました。では、最初に各月毎にどれくらいのファイルが格納されているのかを集計してみましょう。 この時のコードは以下です。

// データ加工
val perMonth = df.convert(date).with { YearMonth.of(it.year, it.month) }
        .groupBy("作成日").aggregate {
            sumOf { fileSize() } into "ファイルサイズ"
            count() into "file count"
        }.sortBy("作成日")

// 設定・便利系関数
dataFrameConfig.display.rowsLimit = 200
val iaCostPerGBMonth = 0.0138
val date by column<LocalDate>("作成日")
val fileSize by column<Long>("ファイルサイズ")
fun renderData(dataFrame: DataFrame<Any?>): DataFrame<Any?> {
    return dataFrame
        .add("cost ($/Month)") { "ファイルサイズ"<Long>() * iaCostPerGBMonth / 1_073_741_824 /*2^10^3*/ }
        .convert("ファイルサイズ").with { (it as Long).toHumanReadableString() }
}

fun Long.toHumanReadableString() = when {
    this == Long.MIN_VALUE || this < 0 -> "N/A"
    this < 1024L -> "$this B"
    this <= 0xfffccccccccccccL shr 40 -> "%.1f KiB".format(toDouble() / (0x1 shl 10))
    this <= 0xfffccccccccccccL shr 30 -> "%.1f MiB".format(toDouble() / (0x1 shl 20))
    else -> "%.1f GiB".format(toDouble() / (0x1 shl 30))
}

// 描画用データ出力
renderData(perMonth)

これを実行すると以下のようになります。

s3 summary per month

データ加工のフローは以下の通りです。

  1. 作成日を年月に変換
  2. 作成月毎にグループ化
  3. ファイルサイズ・ファイル数で作成月毎に集計
  4. 作成月でソート
  5. ファイルサイズからS3の保存コストを計算して列を追加する
  6. ファイルサイズをSI接頭辞を用いた形式に変換する

次に、このデータをグラフにしてみましょう。

letsPlot(
  perMonth.convert("作成日").with { it?.toString() }.toMap()
){ x = "作成日"; y = "ファイルサイズ" } + geomLine() + ggsize(2000, 500)

graph of s3 summary per month

この集計を行ったのは2024年11月の初頭なため、2024年11月だけ値が異常になっています。

コスト計算・予測

グラフで見ると分かる通り、時期によってログの保存量に差があります。これはその時々のログ設定によるものです。 コスト削減効果を把握するために、将来の一定期間における保存量との推定も行いたいのですが、全期間の平均では、時期による変動が考慮されないため、実態に合わない可能性があります。そこで、過去半年間の保存量の平均を予測に用いました。

今回は、コスト削減施策として「保存期間の設定」と「ストレージクラスの変更」を考えていたので、「現状」・「保存期間を5年に設定」・「保存期間を5年に設定+ストレージクラスの変更」の3通りのグラフを作成しました。

val beginYearMonth = perMonth.first().作成日
val baseYearMonth: YearMonth = perMonth.last().作成日
val averageFileSize = perMonth.takeLast(7).dropLast(1).mean { ファイルサイズ }.toLong()
val estimate = dataFrameOf(
    "作成日" to List(72) { baseYearMonth.plusMonths(it.toLong()) },
    "ファイルサイズ" to List(72) { averageFileSize },
)
val extendedData = perMonth.dropLast(1).concat(estimate).sortBy("作成日")
val fiveYear = dataFrameOf(
    "作成日" to List(extendedData.count()) { beginYearMonth.plusMonths(it.toLong()) },
    "ファイルサイズ" to List(extendedData.count()) { extendedData[max(0, it - 59)..it].sumOf { fileSize() } },
)
val extendedDataCumSum = extendedData.cumSum { fileSize }
val extendedPricing = extendedDataCumSum.add("料金 $/月") { calcCost(fileSize()) }
val fiveYearPricing = fiveYear.add("料金 $/月") { calcCost(fileSize()) }
val fiveYearGlacierPricing = fiveYear.add("料金 $/月") { calcCostG(fileSize()) }
letsPlot(
    extendedPricing.convert("作成日").with { it?.toString() }.add("系統"){ "全期間" }.concat(
        fiveYearPricing.convert("作成日").with { it?.toString() }.add("系統"){ "5年間" },
        fiveYearGlacierPricing.convert("作成日").with { it?.toString() }.add("系統"){ "5年間・Glacier" }
    ).sortBy("作成日").toMap()
){ x = "作成日"; y = "料金 $/月"; color = "系統" } + geomLine() + ggsize(2000, 800)

graph of s3 pricing estimate

DataFrameはデータの結合が容易に行えるので、予測値のDataFrameを生成して結合することで、過去データ + 推定データのグラフが簡単に作れました。 「保存期間を5年に設定+ストレージクラスの変更」パターンでも過去分の料金がグラフで描画されているのは本来不要ではありますが、そこだけ加工をする手間をかける意味もないと判断してこのような形になっています。

所感

全体的に使いやすく、利用体験は良かったです。 特に、以下の点が使いやすいと感じました。

  • Kotlinが通常通り使える : Kotlinという言語そのものが非常に使いやすい言語なので、Kotlinを使える事自体が快適です。これを求めてこの技術選定を行いましたが、期待していた通りのものでした。
  • DataFrameライブラリが直感的に使える : Kotlin標準ライブラリのコレクション操作関数に近いインターフェースであるため、Kotlinユーザーにとっては使いやすいです。
  • Jupyter Notebookがインターフェースとして優秀 :
    • 実行結果のキャッシュ機能 : Jupyter Notebook / kotlin-jupyterには、セル単位での実行結果のキャッシュ機能があります。これにより、データ読み込みなどの重い処理は一度行うだけで済み、その後の操作は高速に実行できます。
    • 補完機能 : IntelliJには劣りますが、最低限実用レベルの補完機能があり、変数名や関数等は補完が効きました。
    • インタラクティブ性 : コード記述と実行がインタラクティブに行えるため、効率的に試行錯誤ができました。

総じて、インタラクティブにコードの記述と実行が行えるプラットフォームと、その上でKotlinを利用できる、という体験は非常に良い物でした。

一方で、以下の点を課題に感じました。

  • Extension Properties API : Extension Properties APIは、データのプロパティに応じた拡張プロパティをライブラリ側が自動生成してくれる機能です。そのため、記述が簡潔化できて非常に便利である一方、convert()等を用いてデータ処理を行う際に、プロパティ名をそのままに型変換を行ってしまうと、以後そのプロパティにExtension Properties APIでアクセスすると型周りのエラーが出ます。これは仕様上どうしようもない事だとは思いますが、たまに引っかかって別の書き方で型を明示する、ということが必要になりました。
  • 数値型関連 : 型はプログラマーに多大な恩恵をもたらすものの、一部の場面において邪魔になる場合があります。特に、sumを用いて集計を行う際に、集計元の値がInt型だと集計後もInt型として処理されます。しかし、ある程度の大きさのデータを扱っていると、Int型を集計するとその限界を超えてLong型の領域まで行ってしまう、という事は多々あります。こういう時、Python等の動的型付け言語は自動でより上位の大きさの型として処理してくれますが、Kotlinにおいては自前で処理する必要があります。今回のデータ処理ではちょうどそれが多発するような物でしたので、少し面倒でした。

利用例その2 : VPCフローログの解析

システム基盤グループの管理しているAWSアカウントで、NATゲートウェイの料金が高い(4桁ドル後半)という問題がありました。 NATゲートウェイのコスト削減のためには、NATゲートウェイを通る通信を減らすしかありません。そのためにはキャッシュやVPCエンドポイントの利用が一般的ですが、それらの施策を効果的に実施するためにはまず何の通信が多いのかを調査する必要があります。 そこで利用できるのがVPCフローログです。 VPCフローログはAWSの機能で、VPC又は特定ENIを通過したパケットの情報をキャプチャ・集計して出力してくれます。 フローログはS3に出力できるため、フローログの解析にはAmazon Athenaを利用するのが一般的です。しかし、今回は以下の理由からkotlin-jupyterを利用しました。

  • Amazon Athenaを利用した経験が無かった
  • クエリ毎に料金がかかるので、試行錯誤しているうちに料金がかさんでしまう懸念があった
  • S3の分析時にJupyter Notebookの魅力を感じていた

前処理

フローログには膨大なデータがあります。特にNAT Gatewayを通る通信だと、1つの通信でもNAT Gateway発着で2種類のログが記録されます。 また、フローログは非常に多数のファイルに分かれているため、単純なフィルタリングだけでなくこれらの集計処理も必要です。 そのため、S3の時よりも複雑な前処理をしなければなりません。 今回、前処理はJupyter Notebookを使わず独立したKotlinプログラムで実施しました。処理時間が長くなる事・Notebookでやるには複雑性が高いことが理由です。 前処理は大まかに以下のような内容を実施しました。

  1. 各ログファイルについて、NATゲートウェイ宛の通信ログを抽出・集約
  2. 通信ログをインバウンド通信・アウトバウンド通信に分ける
  3. 通信先毎にパケット数・データ転送量を集計する
  4. 結果をCSVに出力する

フローログに記録されているのはIPパケットのキャプチャなので、宛先はIPアドレスです。これをドメイン名に紐づけないと分析は困難です。 IPとドメインの紐づけには、DNSの解決ログを用いるのが一般的です。今回の環境では、Windows ServerのDNSサーバーを利用していたので、そのログからドメインとIPアドレスの紐づけを作成しました。

分析・可視化

まず、DataFrameを用いてCSVを読み取ります。

val flowlogIn = DataFrame.readCsv("flowlog-in.csv")
val flowlogOut = DataFrame.readCsv("flowlog-out.csv")
val dns = DataFrame.readCsv("dns.csv")

インバウンドとアウトバウンド、それぞれの転送量を確認します。

flowlogIn.sumOf{ bytes() }.toHumanReadableString()
flowlogOut.sumOf{ bytes() }.toHumanReadableString()

flowlog transfer rate

インバウンドの方が圧倒的に多いです。よって、以後はインバウンド通信に絞って分析していきます。

IPアドレス毎に集計してみます。

val sortedIn = flowlogIn.sortByDesc { bytes() }
sortedIn.take(20).convert { bytes() }.with { it.toHumanReadableString() }

flowlog transfer amount per ip

上位のIPアドレスとの通信が大部分を占めていることが分かりました。

AWSのVPCフローログは、pktSrcAwsService / pktDstAwsServiceというフィールドがあり、これは宛先サービスがAWSのものである場合にここに対象サービスが入ります。ここで特定のサービスの比率が大きいなら、VPCエンドポイントを用いることで比較的簡単にコスト削減が見込めます。 今回はインバウンドが多いので、通信先のAWSサービスはpktSrcAwsServiceに記録されています。

flowlogIn.implode { pktSrcAddress and packets and bytes }
    .convert { "bytes"<List<Number>>() }
    .with { it.sumOf{ it.toLong() } }
    .convert { "packets"<List<Number>>() }
    .with { it.sumOf{ it.toLong() } }
    .convert { bytes() }.with { it.toHumanReadableString() }

flowlog transfer amount per aws service

AWSのサービスへの通信はわずかで、大部分が外部サービスとの通信であることが分かりました。

次に、通信量の多い外部サービスについて分析します。 DNSのログと紐づけて見てみます。 ただ、今回の場合、何故かDNSのログに載っていないIPが多数ありました。そこで、一部は逆引きリクエストによりカバーしています。

flowlogIn.leftJoin(dnsWithReverse) { pktSrcAddress match right.ip }
    .implode { "fqdn"<String>() }
    .groupBy("fqdn")
    .values { pktSrcAddress and packets and bytes }
    .convert { "packets"<List<Long>>() }
    .with { it.sum() }
    .convert { "bytes"<List<Long>>() }
    .with { it.sum() }
    .takeTop20()

flowlog transfer amount per fqdn 転送量が一番多いのはMicrosoft系の複数のサービスで使われているIPアドレス、二番目以降はGitHubのようです。

これまでの分析結果から、ルートドメイン単位で集計することで、サービス単位のおおまかな傾向を把握できると考えられました。そこで、FQDNの代わりにルートドメインを紐づけます。

val rootDomainForIp = dnsWithReverse.add("rootDomain") { 
  fqdn.dropLast(1).split('.').takeLast(2).joinToString(separator=".") 
}.implode { 
  "rootDomain"<String>() and fqdn 
}.update { 
  "rootDomain"<List<String>>() 
}.with { it.distinct() }

flowlogIn.leftJoin(rootDomainForIp) { pktSrcAddress match right.ip }
    .groupBy { "rootDomain"<String>() }
    .values("packets","bytes","fqdn")
    .convert { "packets"<List<Long>>() }
    .with { it.sum() }
    .convert { "bytes"<List<Long>>() }
    .with { it.sum() }
    .convert { "fqdn"<List<List<String>?>>() }
    .with { it.flatMap { it ?: emptyList() }.distinct() }
    .takeTop20()

flowlog transfer amount per root domain

実装を簡素化するために、今回は example.co.jp のような属性型ドメイン名は考慮せず、単純に上位2レベル分をルートドメインとして扱っています。今回のケースではこれで支障がありませんでした。 やはりGitHubが最多、次点でMicrosoft系でしたが、ルートドメインで括ることでCDN系が上位に来ました。 また、画像にはありませんが、 cannonical.com等、ルートドメインで纏めることで上位に浮上していたドメインもありました。

所感

この分析は、先述のS3の分析の後にやっているので、DataFrameライブラリの扱いにある程度の慣れがあり、非常に快適に書けました。 先述の所感と比べて、追加で以下の利点を感じました。

  • DataFrameライブラリが多機能 : DataFrameライブラリにはjoinやimplode/explode等、1:多・多:多関係のデータを扱う際に便利な関数が揃っており、データ操作はほとんど公式に提供されている関数だけで済んで非常に便利でした。
  • DataFrameライブラリの公式ドキュメントが優秀 : 今回利用したimplodeという関数は初めて見るものでしたが、 公式ドキュメント にデータ付きで実行前後の内容が示されていたので、仕様を簡単に把握できました。複雑なコレクション操作関数は英語の説明文だけを読んでも、何をするのかよく分からず使ってみて挙動を知るしかない、という事が多々あります。そのため、具体的なデータの実例があるのは本当にありがたかったです。
  • Jupyter Notebookは共有が簡単 : この分析を実施した後にチーム内で結果を共有したのですが、URLを渡せばそれだけで共有できるというのも非常に便利でした。Markdownによるドキュメントとコードを混ぜて書けるので、このNotebookそのものがコードでありドキュメントであり実行結果のプレビューであるという、Jupyter Notebookというインターフェースの便利さも強く感じました。

まとめ

本記事では、kotlin-jupyterを実際のデータ分析タスクで利用した実例と、それを踏まえた所感を紹介しました。 総じてkotlin-jupyterによるJupyter Notebook + Kotlin + DataFrameライブラリの組み合わせは体験が良く、Kotlinに関して知見があり中~小規模なデータの解析をしたい人にはお勧めできます。 Kotlinという言語が好きな人はぜひ一度使ってみてください。

最後まで読んでいただき、ありがとうございます!
この記事をシェアしていただける方はこちらからお願いします。

recruit

DeNAでは、失敗を恐れず常に挑戦し続けるエンジニアを募集しています。