GlueからS3のParquetに対してUpsertを行う

AWS
AWSGlue

GlueからS3のParquetファイルに対してUpsert(データが存在すればInsert、存在しなければUpdate)処理を行う方法について記載します。

ParquetファイルをGlueテーブルとしている場合、実質的にGlueテーブルのUpsert処理を行うことが可能です。

スポンサーリンク

まとめ

大きく分けて次の3パターンでUpsertを実装することができます。

実際のデータを利用したパフォーマンステスト等を実施し、チューニングを経て実行時間の制約を満たすことが可能なパターンを利用するのがよろしいかと存じます。

なお、全て主キー相当の列が存在しないと実装できないことにご注意いただければと存じます。

  • 【PySpark】更新ごとに増分する列を利用する(※1)
  • 【PySpark】差分となる主キーを利用する
  • 【AWS Data Wrangler】merge_upsert_tableを利用する(※2)

※1 例えば「最終更新日時」等の、更新ごとに増分するような列が存在しない場合は利用できない手法です。

※2 AWS Data Wranglerは内部的にPandasが利用されているため、Glue JobがCommand Failed with Exit Code 10.で失敗するで紹介した事象が発生する可能性があります。少量のデータに向いています。

以下、詳細です。

サンプルデータ

サンプルとして次のような「元データ」と「差分データ」を利用します。処理の結果、最終的に「想定データ」のようになることを目指します。

元データ

主キーはid列を想定しています。last_updated列には最終更新のタイムスタンプが付与される想定です。

+---+------------+-------------------+
| id|     message|       last_updated|
+---+------------+-------------------+
|  1|Not Updated.|2021-07-24 00:00:00|
|  2|Not Updated.|2021-07-24 00:00:00|
|  3|Not Updated.|2021-07-24 00:00:00|
|  4|Not Updated.|2021-07-24 00:00:00|
|  5|Not Updated.|2021-07-24 00:00:00|
+---+------------+-------------------+

差分データ

idが1および4のデータについては、last_updatedおよびmessageが更新されています。

また、元データに存在しない6および7のデータが存在しています。

+---+------------+-------------------+
| id|     message|       last_updated|
+---+------------+-------------------+
|  1|    Updated.|2021-07-25 00:00:00|
|  4|    Updated.|2021-07-25 00:00:00|
|  6|Not Updated.|2021-07-25 00:00:00|
|  7|Not Updated.|2021-07-25 00:00:00|
+---+------------+-------------------+

想定データ

元データに存在しているidの1と4については更新が行われ、存在していない6と7については挿入が行われることを想定します。

+---+------------+-------------------+
| id|     message|       last_updated|
+---+------------+-------------------+
|  1|    Updated.|2021-07-25 00:00:00|
|  2|Not Updated.|2021-07-24 00:00:00|
|  3|Not Updated.|2021-07-24 00:00:00|
|  4|    Updated.|2021-07-25 00:00:00|
|  5|Not Updated.|2021-07-24 00:00:00|
|  6|Not Updated.|2021-07-25 00:00:00|
|  7|Not Updated.|2021-07-25 00:00:00|
+---+------------+-------------------+

【PySpark】更新ごとに増分する列値を利用する

PySpark上で結合を行い、主キーが重複している行は、増分する列値を利用して古い方を削除するというものです。大きく次のような流れにて対応します。

  1. S3から元データを読み込む
  2. S3から差分データを読み込む
  3. 元データと差分データを結合する
  4. 結合したデータを主キーと値が増分する列でソートする
  5. 主キーが同じ行が存在する場合は古い行を削除する
  6. S3に新規ファイルとして書き込む
  7. 新規で書き込んだファイルを読み取る
  8. 既存の元データに上書きする

なお、残念ながらoverwriteを利用し、直接既存の元データを上書きすることはできませんでした。

と言いますのも、当該メソッドを利用すると、まず、書き込み先に存在するファイルが削除され、その後、書き込み処理が行われるためです。書き込みデータは既存ファイルのデータから読み取りを行って作成されるものなので、書き込みタイミングで削除されていると処理に失敗してしまいます。

したがって、いったん別ファイルとして保存し、その内容から元のデータが存在する場所に上書きで書き込む形としています。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql import SQLContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import row_number
from pyspark.sql.window import *
from pyspark.sql.functions import col
from pyspark.sql.functions import desc

args = getResolvedOptions(sys.argv, ["JOB_NAME"])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
sqlContext = SQLContext(spark.sparkContext, spark)
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# S3から元データを読み込む
base = spark.read.parquet("s3://xxxxx/base/")
## base.sort("id").show()
## +---+------------+-------------------+
## | id|     message|       last_updated|
## +---+------------+-------------------+
## |  1|Not Updated.|2021-07-24 00:00:00|
## |  2|Not Updated.|2021-07-24 00:00:00|
## |  3|Not Updated.|2021-07-24 00:00:00|
## |  4|Not Updated.|2021-07-24 00:00:00|
## |  5|Not Updated.|2021-07-24 00:00:00|
## +---+------------+-------------------+

# S3から差分データを読み込む
delta = spark.read.parquet("s3://xxxxx/delta/")
## delta.sort("id").show()
## +---+------------+-------------------+
## | id|     message|       last_updated|
## +---+------------+-------------------+
## |  1|    Updated.|2021-07-25 00:00:00|
## |  4|    Updated.|2021-07-25 00:00:00|
## |  6|Not Updated.|2021-07-25 00:00:00|
## |  7|Not Updated.|2021-07-25 00:00:00|
## +---+------------+-------------------+

# 元データと差分データを結合する
upsert = base.union(delta)
## upsert.sort("id").show()
## +---+------------+-------------------+
## | id|     message|       last_updated|
## +---+------------+-------------------+
## |  1|Not Updated.|2021-07-24 00:00:00|
## |  1|    Updated.|2021-07-25 00:00:00|
## |  2|Not Updated.|2021-07-24 00:00:00|
## |  3|Not Updated.|2021-07-24 00:00:00|
## |  4|    Updated.|2021-07-25 00:00:00|
## |  4|Not Updated.|2021-07-24 00:00:00|
## |  5|Not Updated.|2021-07-24 00:00:00|
## |  6|Not Updated.|2021-07-25 00:00:00|
## |  7|Not Updated.|2021-07-25 00:00:00|
## +---+------------+-------------------+

# 結合したデータを主キーと値が増分する列でソートする
upsert = upsert.withColumn("row_num", row_number().over(Window.partitionBy("id").orderBy(desc("last_updated"))))
## upsert.show()
## +---+------------+-------------------+-------+
## | id|     message|       last_updated|row_num|
## +---+------------+-------------------+-------+
## |  1|    Updated.|2021-07-25 00:00:00|      1|
## |  1|Not Updated.|2021-07-24 00:00:00|      2|
## |  6|Not Updated.|2021-07-25 00:00:00|      1|
## |  3|Not Updated.|2021-07-24 00:00:00|      1|
## |  5|Not Updated.|2021-07-24 00:00:00|      1|
## |  4|    Updated.|2021-07-25 00:00:00|      1|
## |  4|Not Updated.|2021-07-24 00:00:00|      2|
## |  7|Not Updated.|2021-07-25 00:00:00|      1|
## |  2|Not Updated.|2021-07-24 00:00:00|      1|
## +---+------------+-------------------+-------+

# 主キーが同じ行が存在する場合は古い行を削除する
upsert = upsert.filter(col("row_num") == 1).drop(col("row_num"))
## upsert.sort("id").show()
## +---+------------+-------------------+
## | id|     message|       last_updated|
## +---+------------+-------------------+
## |  1|    Updated.|2021-07-25 00:00:00|
## |  2|Not Updated.|2021-07-24 00:00:00|
## |  3|Not Updated.|2021-07-24 00:00:00|
## |  4|    Updated.|2021-07-25 00:00:00|
## |  5|Not Updated.|2021-07-24 00:00:00|
## |  6|Not Updated.|2021-07-25 00:00:00|
## |  7|Not Updated.|2021-07-25 00:00:00|
## +---+------------+-------------------+

# ※Upsertの一連の処理をまとめることも可能(下記のようになる)
# upsert = base.union(delta).withColumn("row_num", row_number().over(Window.partitionBy("id").orderBy(desc("last_updated")))).filter(col("row_num") == 1).drop(col("row_num"))

# S3に新規ファイルとして書き込む
upsert.write.mode("overwrite").parquet("s3://xxxxx/tmp/")

# 新規で書き込んだファイルを読み取る
upsert = spark.read.parquet("s3://xxxxx/tmp/")

# 既存の元データに上書きする
upsert.write.mode("overwrite").parquet("s3://xxxxx/base/")

job.commit()

【PySpark】差分となる主キーを利用する

PySpark上で差分対象の主キーを抽出し、既存データから当該主キーを持つ行を削除したDataFrameに、差分対象を結合するというものです。大きく次のような流れにて対応します。

  1. S3から元データを読み込む
  2. S3から差分データを読み込む
  3. 差分データから主キーを読み込む
  4. 元データから差分データに存在する主キーを削除した一時的なデータを作成する
  5. 一時的なデータと差分データを結合する
  6. S3に新規ファイルとして書き込む
  7. 新規で書き込んだファイルを読み取る
  8. 既存の元データに上書きする

この手法は、更新ごとに増加するような列がない場合でも対応できる点がメリットです。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql import SQLContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ["JOB_NAME"])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
sqlContext = SQLContext(spark.sparkContext, spark)
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# S3から元データを読み込む
base = spark.read.parquet("s3://xxxxx/base/")
## base.sort("id").show()
## +---+------------+-------------------+
## | id|     message|       last_updated|
## +---+------------+-------------------+
## |  1|Not Updated.|2021-07-24 00:00:00|
## |  2|Not Updated.|2021-07-24 00:00:00|
## |  3|Not Updated.|2021-07-24 00:00:00|
## |  4|Not Updated.|2021-07-24 00:00:00|
## |  5|Not Updated.|2021-07-24 00:00:00|
## +---+------------+-------------------+

# S3から差分データを読み込む
delta = spark.read.parquet("s3://xxxxx/delta/")
## delta.sort("id").show()
## +---+------------+-------------------+
## | id|     message|       last_updated|
## +---+------------+-------------------+
## |  1|    Updated.|2021-07-25 00:00:00|
## |  4|    Updated.|2021-07-25 00:00:00|
## |  6|Not Updated.|2021-07-25 00:00:00|
## |  7|Not Updated.|2021-07-25 00:00:00|
## +---+------------+-------------------+

# 差分データから主キーを読み込む
delta_primary_keys = delta.select("id").rdd.flatMap(lambda x: x).collect()
## delta_primary_keys
## [1, 4, 6, 7]

# 元データから差分データに存在する主キーを削除した一時的なデータを作成する
upsert = base.filter(~base["id"].isin(delta_primary_keys))
## upsert.show()
## +---+------------+-------------------+
## | id|     message|       last_updated|
## +---+------------+-------------------+
## |  2|Not Updated.|2021-07-24 00:00:00|
## |  3|Not Updated.|2021-07-24 00:00:00|
## |  5|Not Updated.|2021-07-24 00:00:00|
## +---+------------+-------------------+

# 一時的なデータと差分データを結合する
upsert = upsert.unionAll(delta)
## upsert.sort("id").show()
## +---+------------+-------------------+
## | id|     message|       last_updated|
## +---+------------+-------------------+
## |  1|    Updated.|2021-07-25 00:00:00|
## |  2|Not Updated.|2021-07-24 00:00:00|
## |  3|Not Updated.|2021-07-24 00:00:00|
## |  4|    Updated.|2021-07-25 00:00:00|
## |  5|Not Updated.|2021-07-24 00:00:00|
## |  6|Not Updated.|2021-07-25 00:00:00|
## |  7|Not Updated.|2021-07-25 00:00:00|
## +---+------------+-------------------+

# S3に新規ファイルとして書き込む
upsert.write.mode("overwrite").parquet("s3://xxxxx/tmp/")

# 新規で書き込んだファイルを読み取る
upsert = spark.read.parquet("s3://xxxxx/tmp/")

# 既存の元データに上書きする
upsert.write.mode("overwrite").parquet("s3://xxxxx/base/")

job.commit()

【AWS Data Wrangler】merge_upsert_tableを利用する

AWS Data Wranglerを利用し、Glueテーブルに紐づくS3ファイルを直接Upsertするというものです。

AWS Data Wranglerは、下記のようにGlueのジョブパラメータに値を入れるだけでGlueジョブから利用することが可能です。

Key: –additional-python-modules

Value: pyarrow==2,awswrangler

Install — AWS Data Wrangler 2.10.0 documentation

大きく次のような流れにて対応します。

  1. S3から差分データを読み込む
  2. PandasのDataFrameに変換する
  3. merge_upsert_tableメソッドを呼び出す

なお、冒頭にも記載していますが、当該メソッドはPandasを利用するため、たとえ対象が分割可能なファイルであっても、1つのノードのメモリ上に全てのデータが乗り切らないと処理できません。

したがって、大きなデータでの当該機能の利用は向いていません。小さなデータであれば、非常に実装もシンプルで良いものかと存じます。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql import SQLContext
from awsglue.context import GlueContext
from awsglue.job import Job
import awswrangler as wr
import pandas as pd

args = getResolvedOptions(sys.argv, ["JOB_NAME"])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
sqlContext = SQLContext(spark.sparkContext, spark)
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# S3から差分データを読み込む
delta = spark.read.parquet("s3://xxxxx/delta/")
## delta.sort("id").show()
## +---+------------+-------------------+
## | id|     message|       last_updated|
## +---+------------+-------------------+
## |  1|    Updated.|2021-07-25 00:00:00|
## |  4|    Updated.|2021-07-25 00:00:00|
## |  6|Not Updated.|2021-07-25 00:00:00|
## |  7|Not Updated.|2021-07-25 00:00:00|
## +---+------------+-------------------+

# PandasのDataFrameに変換する
delta = delta.toPandas()

# merge_upsert_tableメソッドを呼び出す
wr.s3.merge_upsert_table(delta_df=delta, database="<database>", table="<table>", primary_key=["id"])

job.commit()

最後に

ParquetのUpsertを実施したいケースはそれなりにあると思うので、そういったメソッドが既に存在していると思いましたが、執筆時点では確認することができませんでした。

そのため、回りくどい実装方法となっていますが、少しでもご参考になれば幸いです。

コメント

タイトルとURLをコピーしました