Использование Yandex Object Storage в Yandex Data Processing
В этом разделе рассмотрены различные способы доступа к объектам из бакетов Object Storage для процессов, запущенных на кластерах Yandex Data Processing.
Примечание
Настройте сеть кластера перед настройкой доступа к сервисам Yandex Cloud и интернет-ресурсам.
На скорость чтения и записи файлов в бакеты влияют настройки компонентов:
- Настройки, заданные при создании кластера, влияют на все запущенные в кластере задания.
- Настройки, заданные при создании заданий, переопределяют настройки уровня кластера и могут быть индивидуальными для каждого задания.
DistCp
Для копирования файлов из Object Storage в HDFS используйте утилиту DistCp
Для аутентификации в Object Storage можно использовать один из подходов:
- Использовать IAM-токен сервисного аккаунта кластера.
- Использовать CredentialProvider
. - Передавать параметры
access key
иsecret key
статических ключей доступа при создании задачи.
Доступ в S3 с аутентификацией через IAM-токен сервисного аккаунта кластера
-
При создании кластера укажите сервисный аккаунт. Если кластер уже создан, добавьте сервисный аккаунт с помощью кнопки Редактировать в консоли управления.
Сервисному аккаунту должны быть назначены роли:
- dataproc.agent — чтобы сервисный аккаунт мог получать информацию о состоянии хостов кластера, заданиях и лог-группах.
- dataproc.provisioner — чтобы сервисный аккаунт мог взаимодействовать с автоматически масштабируемой группой ВМ. Тогда будет доступно автомасштабирование подкластеров.
-
У сервисного аккаунта должен быть доступ к нужному бакету. Для этого выдайте сервисному аккаунту права в ACL бакета, либо роль
storage.viewer
илиstorage.editor
.Подробнее про эти роли см. в документации Object Storage.
Например, получите список файлов, находящихся в публичном бакете
yc-mdb-examples
по путиdataproc/example01/set01
. Для этого подключитесь к кластеру и выполните команду:hadoop fs -ls s3a://yc-mdb-examples/dataproc/example01/set01
Результат:
Found 12 items -rw-rw-rw- 1 root root 19327838 2019-09-13 17:17 s3a://yc-mdb-examples/dataproc/example01/set01/On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_2018_1.parquet -rw-rw-rw- 1 root root 21120204 2019-09-13 17:17 s3a://yc-mdb-examples/dataproc/example01/set01/On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_2018_10.parquet -rw-rw-rw- 1 root root 20227757 2019-09-13 17:17 s3a://yc-mdb-examples/dataproc/example01/set01/ ...
Копирование с использованием CredentialProvider
Чтобы воспользоваться провайдером для хранения секретов, разместите эти секреты в компонентах, которым нужен доступ к Object Storage. Для этого можно воспользоваться JCEKS
В примере вы создадите файл с секретами, который затем разместите в HDFS:
-
Укажите статический и секретный ключи, например:
hadoop credential create fs.s3a.access.key \ -value <статический_ключ> \ -provider localjceks://file/home/jack/yc.jceks && \ hadoop credential create fs.s3a.secret.key \ -value <секретный_ключ> \ -provider localjceks://file/home/jack/yc.jceks
-
Перенесите файл секрета в локальный HDFS:
hdfs dfs -put /home/jack/yc.jceks /user/root/
-
Скопируйте файл из Object Storage непосредственно в HDFS:
hadoop distcp \ -D fs.s3a.bucket.dataproc-examples.endpoint=storage.yandexcloud.net \ -D hadoop.security.credential.provider.path=jceks://hdfs/user/root/yc.jceks \ -update \ -skipcrccheck \ -numListstatusThreads 10 \ s3a://yc-mdb-examples/dataproc/example01/set01 \ hdfs://<хост_HDFS>/<путь>/
<хост_HDFS>
— целевой сервер HDFS, который вы используете. Сервер по умолчанию можно получить с помощью команды:hdfs getconf -confKey fs.defaultFS
Пример команды копирования файлов из бакета:
hadoop distcp \
-D fs.s3a.bucket.dataproc-examples.endpoint=storage.yandexcloud.net \
-D hadoop.security.credential.provider.path=jceks://hdfs/user/root/yc.jceks \
-update \
-skipcrccheck \
-numListstatusThreads 10 \
s3a://yc-mdb-examples/dataproc/example01/set01 \
hdfs://rc1b-dataproc-m-d31bs470ivkyrz60.mdb.yandexcloud.net/user/root/datasets/set01/
Копирование файлов с передачей ключей в аргументах
Вы можете не создавать файл секретов, а передавать ключи в аргументах команды:
hadoop distcp \
-D fs.s3a.bucket.dataproc-examples.endpoint=storage.yandexcloud.net \
-D fs.s3a.bucket.dataproc-examples.access.key=<статический_ключ> \
-D fs.s3a.bucket.dataproc-examples.secret.key=<секретный_ключ> \
-update \
-skipcrccheck \
-numListstatusThreads 10 \
s3a://yc-mdb-examples/dataproc/example01/set01 \
hdfs://rc1b-dataproc-m-d31bs470ivkyrz60.mdb.yandexcloud.net/user/root/datasets/set01/
Оптимизация чтения файлов из Object Storage
Способ чтения данных из бакета определяется настройкойfs.s3a.experimental.input.fadvise
. Ее значение зависит от версии используемого образа:
- В образах версий
1.0
—1.4
по умолчанию используется значениеsequential
. Оно подходит для операций последовательного чтения файлов, но для произвольного работает медленно. Если вы чаще используете произвольный доступ к файлам, добавьте в свойства компонентов кластера или укажите в настройках задания значениеrandom
. - В образе версии
2.0
по умолчанию используется значениеnormal
: работа с файлами происходит в последовательном режиме, но если приложение выполняет операции произвольного доступа, режим автоматически переключается наrandom
.
Подробнее об используемых версиях компонентов см. в разделе Среда исполнения.
Оптимизация записи файлов в Object Storage
Чтобы увеличить скорость записи файлов в Object Storage, вы можете:
Использование коммиттеров S3A
Коммиттеры S3A — входящий в состав Apache Hadoop набор программных модулей для записи данных в объектное хранилище по протоколу S3, обеспечивающий эффективное и приближенное к атомарному подтверждение выполненных изменений. Подробнее см. в документации Apache Hadoop
Примечание
Коммиттеры S3A не используются и не требуются для работы с таблицами, управляемыми средствами библиотеки DeltaLake
Есть три основных режима работы коммиттеров S3A:
Режим | Среда | Необходим HDFS | Запись в партиционированныетаблицы | Скорость записи |
---|---|---|---|---|
directory |
MapReduce, Spark | Да* | Полная перезапись | Обычная |
magic |
MapReduce, Spark | Нет (запись напрямую в S3) | Не поддерживается | Максимальная |
partitioned |
Spark | Да* | Замена и дополнение партиций | Обычная |
* В режимах directory
и partitioned
не производится проверка на фактическое наличие HDFS для хранения промежуточных данных. При этом часть заданий могут успешно отрабатывать без HDFS, однако в сложных заданиях могут возникнуть проблемы, проявляющиеся в виде ошибок «файл не найден» или неполной записи результатов задания в Object Storage.
Чтобы включить коммиттеры S3A, задайте значения следующих настроек:
core:fs.s3a.committer.magic.enabled : true
, если задания будут использовать режимmagic
.core:fs.s3a.committer.name
— используемый режим по умолчанию:directory
,magic
илиpartitioned
.core:fs.s3a.committer.staging.abort.pending.uploads : false
для Hadoop 3.2.2 в составе образа Yandex Data Processing версии 2.0 илиcore:fs.s3a.committer.abort.pending.uploads : false
для Hadoop 3.3.2 в составе образа 2.1, если несколько параллельно работающих заданий выполняют запись в одну и ту же таблицу.core:mapreduce.outputcommitter.factory.scheme.s3a : org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory
.spark:spark.hadoop.fs.s3a.committer.name
— используемый режим по умолчанию:directory
,magic
илиpartitioned
.spark:spark.sql.parquet.output.committer.class : org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
.spark:spark.sql.sources.commitProtocolClass : org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
.- (опционально)
core:fs.s3a.committer.staging.conflict-mode
— действие при обнаружении в целевой таблице уже существующих партиций с данными (при использовании режимаpartitioned
):append
— данные в существующей партиции дополняются новыми данными.fail
— при попытке перезаписи существующей партиции задание останавливается с ошибкой.replace
— данные в существующей партиции заменяются данными новой партиции.
Используемый режим работы коммиттеров S3A может переопределяться для конкретного задания путем установки настроек fs.s3a.committer.name
и spark.hadoop.fs.s3a.committer.name
в необходимое значение (directory
, magic
или partitioned
).
Не следует менять значение по умолчанию для настройки spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version
, поскольку Yandex Object Storage не поддерживает атомарные переименования каталогов.
Настройки Apache Hadoop
Способ записи данных в бакет Object Storage определяется настройкой core:fs.s3a.fast.upload
. Ее значение зависит от версии используемого образа:
- В образах версий
1.0
—1.4
по умолчанию используется значениеfalse
для экономии RAM. Укажите для этой настройки значениеtrue
в свойствах компонентов кластера или настройках задания. Это ускорит запись в бакет больших файлов и предотвратит переполнение хранилищ узлов. - В образе версии
2.0
настройкаfs.s3a.fast.upload
включена по умолчанию.
При необходимости укажите значения других настроек
fs.s3a.committer.threads
— количество потоков, выполняющих фиксацию изменений в Object Storage в конце работы задания.fs.s3a.connection.maximum
— количество разрешенных соединений с Object Storage.fs.s3a.connection.timeout
— максимальное время ожидания соединения с Object Storage в миллисекундах.fs.s3a.fast.upload.active.blocks
— максимальное количество блоков в одном потоке вывода.fs.s3a.fast.upload.buffer
— тип буфера, используемого для временного хранения загружаемых данных:disk
— данные сохраняются в каталог, указанный в настройкеfs.s3a.buffer.dir
;array
— используются массивы в куче JVM;bytebuffer
— используется RAM вне кучи JVM.
fs.s3a.max.total.tasks
— размер очереди операций над бакетом Object Storage, которые не могут быть запущены из-за исчерпания рабочих потоков.fs.s3a.multipart.size
— размер кусков (chunk) в байтах, на которые будут разбиты данные при копировании или выгрузке в бакет.fs.s3a.threads.max
— количество рабочих потоков в менеджере загрузок (AWS Transfer Manager).
Примечание
Большие значения этих параметров могут привести к увеличению потребления вычислительных ресурсов на хостах кластера Yandex Data Processing.
Подробнее см. в документации Apache Hadoop
Настройки Apache Spark
При доступе к данным в Object Storage из заданий Spark рекомендовано использовать значение true
для настройки spark.sql.hive.metastorePartitionPruning
.
При работе с данными в формате Parquet в заданиях Spark рекомендованы настройки:
spark.hadoop.parquet.enable.summary-metadata : false
spark.sql.parquet.mergeSchema : false
spark.sql.parquet.filterPushdown : true
При работе с данными в формате Parquet и использовании динамической перезаписи партиций рекомендованы настройки:
spark:spark.sql.sources.partitionOverwriteMode : dynamic
spark:spark.sql.parquet.output.committer.class : org.apache.parquet.hadoop.ParquetOutputCommitter
spark:spark.sql.sources.commitProtocolClass : org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
При работе с данными в формате Orc в заданиях Spark рекомендованы настройки:
spark.sql.orc.cache.stripe.details.size : 10000
spark.sql.orc.filterPushdown : true
spark.sql.orc.splits.include.file.footer : true
Задания, создающие или обновляющие большое количество (сотни и тысячи) партиций в таблицах, могут тратить много времени на актуализацию записей о партициях в кластере Hive Metastore. Для ускорения этого процесса увеличьте значения следующих настроек:
hive:datanucleus.connectionPool.maxPoolSize
— максимальный размер пула соединений к БД Metastore.hive:hive.metastore.fshandler.threads
— количество рабочих потоков, выполняющих фоновые операции с файловой системой Metastore.spark:spark.sql.addPartitionInBatch.size
— количество партиций, актуализируемых за один вызов Metastore. Оптимальное значение —10 × <значение_настройки_hive:hive.metastore.fshandler.threads>
или выше.
Примечание
Чрезмерно большие значения перечисленных параметров могут привести к исчерпанию системных ресурсов Metastore. Большой размер пула соединений к БД Metastore может потребовать изменения настроек и увеличения объема вычислительных ресурсов кластера.
Подробнее см. в документации Apache Spark
Использование s3fs
s3fs
позволяет монтировать бакеты Object Storage посредством Fuse. Более подробно об использовании утилиты можно узнать на странице s3fs.
Использование Object Storage в Spark
Реализуйте нужный вариант доступа:
-
С использованием JCEKS:
sc.hadoopConfiguration.set("fs.s3a.endpoint", "storage.yandexcloud.net"); sc.hadoopConfiguration.set("fs.s3a.signing-algorithm", ""); sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"); sc.hadoopConfiguration.set("hadoop.security.credential.provider.path", "jceks://hdfs/<путь_к_файлу_JCEKS>");
-
По ключу доступа и секрету:
sc.hadoopConfiguration.set("fs.s3a.endpoint", "storage.yandexcloud.net"); sc.hadoopConfiguration.set("fs.s3a.signing-algorithm", ""); sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"); sc.hadoopConfiguration.set("fs.s3a.access.key","<ключ_доступа>"); sc.hadoopConfiguration.set("fs.s3a.secret.key","<секрет_бакета>");
После этого можно читать файл из Object Storage:
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.parquet("s3a://<имя_бакета>/<путь_к_объекту>")
Выберите способ доступа:
-
Доступ к объектам Object Storage c использованием JCEKS:
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "storage.yandexcloud.net") sc._jsc.hadoopConfiguration().set("fs.s3a.signing-algorithm", "") sc._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") sc._jsc.hadoopConfiguration().set("hadoop.security.credential.provider.path", "jceks://hdfs/<путь_к_файлу_JCEKS>")
-
Доступ по ключу доступа и секрету бакета:
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "storage.yandexcloud.net") sc._jsc.hadoopConfiguration().set("fs.s3a.signing-algorithm", "") sc._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") sc._jsc.hadoopConfiguration().set("fs.s3a.access.key","<ключ_доступа>") sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key","<секрет_бакета>")
Получив доступ, вы можете читать файл напрямую из Object Storage:
from pyspark.sql import SQLContext
sql = SQLContext(sc)
df = sql.read.parquet("s3a://<имя_бакета>/<путь_к_объекту>")
Подробнее см. на странице Настройки Spark для работы с Yandex Object Storage.