Использование Yandex Object Storage в Yandex Data Processing
В этом разделе рассмотрены различные способы доступа к объектам из бакетов Object Storage для процессов, запущенных на кластерах Yandex Data Processing.
Примечание
Настройте сеть кластера перед настройкой доступа к сервисам Yandex Cloud и интернет-ресурсам.
На скорость чтения и записи файлов в бакеты влияют настройки компонентов:
- Настройки, заданные при создании кластера, влияют на все запущенные в кластере задания.
- Настройки, заданные при создании заданий, переопределяют настройки уровня кластера и могут быть индивидуальными для каждого задания.
DistCp
Для копирования файлов из Object Storage в HDFS используйте утилиту DistCp
Для аутентификации в Object Storage можно использовать один из подходов:
- Использовать IAM-токен сервисного аккаунта кластера.
- Использовать CredentialProvider
. - Передавать параметры
access keyиsecret keyстатических ключей доступа при создании задачи.
Доступ в S3 с аутентификацией через IAM-токен сервисного аккаунта кластера
-
При создании кластера укажите сервисный аккаунт. Если кластер уже создан, добавьте сервисный аккаунт с помощью кнопки Редактировать в консоли управления.
Сервисному аккаунту должны быть назначены роли:
- dataproc.agent — чтобы сервисный аккаунт мог получать информацию о состоянии хостов кластера, заданиях и лог-группах.
- dataproc.provisioner — чтобы сервисный аккаунт мог взаимодействовать с автоматически масштабируемой группой ВМ. Тогда будет доступно автомасштабирование подкластеров.
Совет
Чтобы ограничить права сервисного аккаунта кластера (его IAM-токен доступен при выполнении заданий):
- Укажите отдельный сервисный аккаунт для автомасштабирования подкластеров при создании или изменении кластера через интерфейсы Yandex Cloud CLI, Terraform или API.
- Назначьте роль
dataproc.provisionerтолько этому аккаунту.
-
У сервисного аккаунта должен быть доступ к нужному бакету. Для этого выдайте сервисному аккаунту права в ACL бакета, либо роль
storage.viewerилиstorage.editor.Подробнее про эти роли см. в документации Object Storage.
Например, получите список файлов, находящихся в публичном бакете
yc-mdb-examplesпо путиdataproc/example01/set01. Для этого подключитесь к кластеру и выполните команду:hadoop fs -ls s3a://yc-mdb-examples/dataproc/example01/set01Результат:
Found 12 items -rw-rw-rw- 1 root root 19327838 2019-09-13 17:17 s3a://yc-mdb-examples/dataproc/example01/set01/On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_2018_1.parquet -rw-rw-rw- 1 root root 21120204 2019-09-13 17:17 s3a://yc-mdb-examples/dataproc/example01/set01/On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_2018_10.parquet -rw-rw-rw- 1 root root 20227757 2019-09-13 17:17 s3a://yc-mdb-examples/dataproc/example01/set01/ ...
Копирование с использованием CredentialProvider
Чтобы воспользоваться провайдером для хранения секретов, разместите эти секреты в компонентах, которым нужен доступ к Object Storage. Для этого можно воспользоваться JCEKS
В примере вы создадите файл с секретами, который затем разместите в HDFS:
-
Укажите статический и секретный ключи, например:
hadoop credential create fs.s3a.access.key \ -value <статический_ключ> \ -provider localjceks://file/home/jack/yc.jceks && \ hadoop credential create fs.s3a.secret.key \ -value <секретный_ключ> \ -provider localjceks://file/home/jack/yc.jceks -
Перенесите файл секрета в локальный HDFS:
hdfs dfs -put /home/jack/yc.jceks /user/root/ -
Скопируйте файл из Object Storage непосредственно в HDFS:
hadoop distcp \ -D fs.s3a.bucket.dataproc-examples.endpoint=storage.yandexcloud.net \ -D hadoop.security.credential.provider.path=jceks://hdfs/user/root/yc.jceks \ -update \ -skipcrccheck \ -numListstatusThreads 10 \ s3a://yc-mdb-examples/dataproc/example01/set01 \ hdfs://<хост_HDFS>/<путь>/<хост_HDFS>— целевой сервер HDFS, который вы используете. Сервер по умолчанию можно получить с помощью команды:hdfs getconf -confKey fs.defaultFS
Пример команды копирования файлов из бакета:
hadoop distcp \
-D fs.s3a.bucket.dataproc-examples.endpoint=storage.yandexcloud.net \
-D hadoop.security.credential.provider.path=jceks://hdfs/user/root/yc.jceks \
-update \
-skipcrccheck \
-numListstatusThreads 10 \
s3a://yc-mdb-examples/dataproc/example01/set01 \
hdfs://rc1b-dataproc-m-d31bs470ivkyrz60.mdb.yandexcloud.net/user/root/datasets/set01/
Копирование файлов с передачей ключей в аргументах
Вы можете не создавать файл секретов, а передавать ключи в аргументах команды:
hadoop distcp \
-D fs.s3a.bucket.dataproc-examples.endpoint=storage.yandexcloud.net \
-D fs.s3a.bucket.dataproc-examples.access.key=<статический_ключ> \
-D fs.s3a.bucket.dataproc-examples.secret.key=<секретный_ключ> \
-update \
-skipcrccheck \
-numListstatusThreads 10 \
s3a://yc-mdb-examples/dataproc/example01/set01 \
hdfs://rc1b-dataproc-m-d31bs470ivkyrz60.mdb.yandexcloud.net/user/root/datasets/set01/
Оптимизация чтения файлов из Object Storage
Способ чтения данных из бакета определяется настройкойfs.s3a.experimental.input.fadvise. Ее значение зависит от версии используемого образа:
- В образах версий
1.0—1.4по умолчанию используется значениеsequential. Оно подходит для операций последовательного чтения файлов, но для произвольного работает медленно. Если вы чаще используете произвольный доступ к файлам, добавьте в свойства компонентов кластера или укажите в настройках задания значениеrandom. - В образе версии
2.0по умолчанию используется значениеnormal: работа с файлами происходит в последовательном режиме, но если приложение выполняет операции произвольного доступа, режим автоматически переключается наrandom.
Подробнее об используемых версиях компонентов см. в разделе Среда исполнения.
Оптимизация записи файлов в Object Storage
Чтобы увеличить скорость записи файлов в Object Storage, вы можете:
Использование коммиттеров S3A
Коммиттеры S3A — входящий в состав Apache Hadoop набор программных модулей для записи данных в объектное хранилище по протоколу S3, обеспечивающий эффективное и приближенное к атомарному подтверждение выполненных изменений. Подробнее см. в документации Apache Hadoop
Примечание
Коммиттеры S3A не используются и не требуются для работы с таблицами, управляемыми средствами библиотеки DeltaLake
Есть три основных режима работы коммиттеров S3A:
| Режим | Среда | Необходим HDFS | Запись в партиционированныетаблицы | Скорость записи |
|---|---|---|---|---|
directory |
MapReduce, Spark | Да* | Полная перезапись | Обычная |
magic |
MapReduce, Spark | Нет (запись напрямую в S3) | Не поддерживается | Максимальная |
partitioned |
Spark | Да* | Замена и дополнение партиций | Обычная |
* В режимах directory и partitioned не производится проверка на фактическое наличие HDFS для хранения промежуточных данных. При этом часть заданий могут успешно отрабатывать без HDFS, однако в сложных заданиях могут возникнуть проблемы, проявляющиеся в виде ошибок «файл не найден» или неполной записи результатов задания в Object Storage.
Чтобы включить коммиттеры S3A, задайте значения следующих настроек:
core:fs.s3a.committer.magic.enabled : true, если задания будут использовать режимmagic.core:fs.s3a.committer.name— используемый режим по умолчанию:directory,magicилиpartitioned.core:fs.s3a.committer.staging.abort.pending.uploads : falseдля Hadoop 3.2.2 в составе образа Yandex Data Processing версии 2.0 илиcore:fs.s3a.committer.abort.pending.uploads : falseдля Hadoop 3.3.2 в составе образа 2.1, если несколько параллельно работающих заданий выполняют запись в одну и ту же таблицу.core:mapreduce.outputcommitter.factory.scheme.s3a : org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory.spark:spark.hadoop.fs.s3a.committer.name— используемый режим по умолчанию:directory,magicилиpartitioned.spark:spark.sql.parquet.output.committer.class : org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter.spark:spark.sql.sources.commitProtocolClass : org.apache.spark.internal.io.cloud.PathOutputCommitProtocol.- (опционально)
core:fs.s3a.committer.staging.conflict-mode— действие при обнаружении в целевой таблице уже существующих партиций с данными (при использовании режимаpartitioned):append— данные в существующей партиции дополняются новыми данными.fail— при попытке перезаписи существующей партиции задание останавливается с ошибкой.replace— данные в существующей партиции заменяются данными новой партиции.
Используемый режим работы коммиттеров S3A может переопределяться для конкретного задания путем установки настроек fs.s3a.committer.name и spark.hadoop.fs.s3a.committer.name в необходимое значение (directory, magic или partitioned).
Не следует менять значение по умолчанию для настройки spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version, поскольку Yandex Object Storage не поддерживает атомарные переименования каталогов.
Настройки Apache Hadoop
Способ записи данных в бакет Object Storage определяется настройкой core:fs.s3a.fast.upload. Ее значение зависит от версии используемого образа:
- В образах версий
1.0—1.4по умолчанию используется значениеfalseдля экономии RAM. Укажите для этой настройки значениеtrueв свойствах компонентов кластера или настройках задания. Это ускорит запись в бакет больших файлов и предотвратит переполнение хранилищ узлов. - В образе версии
2.0настройкаfs.s3a.fast.uploadвключена по умолчанию.
При необходимости укажите значения других настроек
fs.s3a.committer.threads— количество потоков, выполняющих фиксацию изменений в Object Storage в конце работы задания.fs.s3a.connection.maximum— количество разрешенных соединений с Object Storage.fs.s3a.connection.timeout— максимальное время ожидания соединения с Object Storage в миллисекундах.fs.s3a.fast.upload.active.blocks— максимальное количество блоков в одном потоке вывода.fs.s3a.fast.upload.buffer— тип буфера, используемого для временного хранения загружаемых данных:disk— данные сохраняются в каталог, указанный в настройкеfs.s3a.buffer.dir;array— используются массивы в куче JVM;bytebuffer— используется RAM вне кучи JVM.
fs.s3a.max.total.tasks— размер очереди операций над бакетом Object Storage, которые не могут быть запущены из-за исчерпания рабочих потоков.fs.s3a.multipart.size— размер кусков (chunk) в байтах, на которые будут разбиты данные при копировании или выгрузке в бакет.fs.s3a.threads.max— количество рабочих потоков в менеджере загрузок (AWS Transfer Manager).
Примечание
Большие значения этих параметров могут привести к увеличению потребления вычислительных ресурсов на хостах кластера Yandex Data Processing.
Подробнее см. в документации Apache Hadoop
Настройки Apache Spark
При доступе к данным в Object Storage из заданий Spark рекомендовано использовать значение true для настройки spark.sql.hive.metastorePartitionPruning.
При работе с данными в формате Parquet в заданиях Spark рекомендованы настройки:
spark.hadoop.parquet.enable.summary-metadata : falsespark.sql.parquet.mergeSchema : falsespark.sql.parquet.filterPushdown : true
При работе с данными в формате Parquet и использовании динамической перезаписи партиций рекомендованы настройки:
spark:spark.sql.sources.partitionOverwriteMode : dynamicspark:spark.sql.parquet.output.committer.class : org.apache.parquet.hadoop.ParquetOutputCommitterspark:spark.sql.sources.commitProtocolClass : org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
При работе с данными в формате Orc в заданиях Spark рекомендованы настройки:
spark.sql.orc.cache.stripe.details.size : 10000spark.sql.orc.filterPushdown : truespark.sql.orc.splits.include.file.footer : true
Задания, создающие или обновляющие большое количество (сотни и тысячи) партиций в таблицах, могут тратить много времени на актуализацию записей о партициях в кластере Apache Hive™ Metastore. Для ускорения этого процесса увеличьте значения следующих настроек:
hive:datanucleus.connectionPool.maxPoolSize— максимальный размер пула соединений к БД Apache Hive™ Metastore.hive:hive.metastore.fshandler.threads— количество рабочих потоков, выполняющих фоновые операции с файловой системой Apache Hive™ Metastore.spark:spark.sql.addPartitionInBatch.size— количество партиций, актуализируемых за один вызов Apache Hive™ Metastore. Оптимальное значение —10 × <значение_настройки_hive:hive.metastore.fshandler.threads>или выше.
Примечание
Чрезмерно большие значения перечисленных параметров могут привести к исчерпанию системных ресурсов Apache Hive™ Metastore. Большой размер пула соединений к БД Apache Hive™ Metastore может потребовать изменения настроек и увеличения объема вычислительных ресурсов кластера.
Подробнее см. в документации Apache Spark
Использование s3fs
s3fs позволяет монтировать бакеты Object Storage посредством Fuse. Более подробно об использовании утилиты можно узнать на странице s3fs.
Использование Object Storage в Spark
Реализуйте нужный вариант доступа:
-
С использованием JCEKS:
sc.hadoopConfiguration.set("fs.s3a.endpoint", "storage.yandexcloud.net"); sc.hadoopConfiguration.set("fs.s3a.signing-algorithm", ""); sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"); sc.hadoopConfiguration.set("hadoop.security.credential.provider.path", "jceks://hdfs/<путь_к_файлу_JCEKS>"); -
По ключу доступа и секрету:
sc.hadoopConfiguration.set("fs.s3a.endpoint", "storage.yandexcloud.net"); sc.hadoopConfiguration.set("fs.s3a.signing-algorithm", ""); sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"); sc.hadoopConfiguration.set("fs.s3a.access.key","<ключ_доступа>"); sc.hadoopConfiguration.set("fs.s3a.secret.key","<секрет_бакета>");
После этого можно читать файл из Object Storage:
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.parquet("s3a://<имя_бакета>/<путь_к_объекту>")
Выберите способ доступа:
-
Доступ к объектам Object Storage c использованием JCEKS:
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "storage.yandexcloud.net") sc._jsc.hadoopConfiguration().set("fs.s3a.signing-algorithm", "") sc._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") sc._jsc.hadoopConfiguration().set("hadoop.security.credential.provider.path", "jceks://hdfs/<путь_к_файлу_JCEKS>") -
Доступ по ключу доступа и секрету бакета:
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "storage.yandexcloud.net") sc._jsc.hadoopConfiguration().set("fs.s3a.signing-algorithm", "") sc._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") sc._jsc.hadoopConfiguration().set("fs.s3a.access.key","<ключ_доступа>") sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key","<секрет_бакета>")
Получив доступ, вы можете читать файл напрямую из Object Storage:
from pyspark.sql import SQLContext
sql = SQLContext(sc)
df = sql.read.parquet("s3a://<имя_бакета>/<путь_к_объекту>")
Подробнее см. на странице Настройки Spark для работы с Yandex Object Storage.