Yandex Cloud
Поиск
Связаться с намиПодключиться
  • Документация
  • Блог
  • Все сервисы
  • Статус работы сервисов
    • Популярные
    • Инфраструктура и сеть
    • Платформа данных
    • Контейнеры
    • Инструменты разработчика
    • Бессерверные вычисления
    • Безопасность
    • Мониторинг и управление ресурсами
    • Машинное обучение
    • Бизнес-инструменты
  • Все решения
    • По отраслям
    • По типу задач
    • Экономика платформы
    • Безопасность
    • Техническая поддержка
    • Каталог партнёров
    • Обучение и сертификация
    • Облако для стартапов
    • Облако для крупного бизнеса
    • Центр технологий для общества
    • Облако для интеграторов
    • Поддержка IT-бизнеса
    • Облако для фрилансеров
    • Обучение и сертификация
    • Блог
    • Документация
    • Контент-программа
    • Мероприятия и вебинары
    • Контакты, чаты и сообщества
    • Идеи
    • Истории успеха
    • Тарифы Yandex Cloud
    • Промоакции и free tier
    • Правила тарификации
  • Документация
  • Блог
Проект Яндекса
© 2025 ООО «Яндекс.Облако»
Yandex Object Storage
    • Все руководства
    • Получение статистики запросов к объекту с использованием S3 Select
    • Получение статистики посещения сайта с использованием S3 Select
    • Получение статистики запросов к объектам с использованием Yandex Query
    • Анализ поресурсной детализации расходов
    • Шифрование на стороне сервера
    • Интеграция L7-балансировщика с CDN и Object Storage
    • Сине-зеленое и канареечное развертывание версий сервиса
    • Анализ логов с использованием DataLens
    • Монтирование бакетов к файловой системе хостов Yandex Data Processing
    • Использование Object Storage в Yandex Data Processing
    • Импорт данных из Object Storage, обработка и экспорт в Managed Service for ClickHouse®
    • Подключение бакета как диска в Windows
    • Миграция данных из Yandex Data Streams с помощью Yandex Data Transfer
    • Использование гибридного хранилища в Yandex Managed Service for ClickHouse®
    • Загрузка данных из Yandex Managed Service for OpenSearch в Yandex Object Storage с помощью Yandex Data Transfer
    • Автоматическое копирование объектов из бакета в бакет
    • Регулярное асинхронное распознавание аудиофайлов в бакете
    • Обучение модели в Yandex DataSphere на данных из Object Storage
    • Подключение к Object Storage из VPC
    • Перенос данных в Yandex Managed Service for PostgreSQL с использованием Yandex Data Transfer
    • Загрузка данных в Yandex Managed Service for Greenplum® с помощью Yandex Data Transfer
    • Загрузка данных в Yandex Managed Service for ClickHouse® с помощью Yandex Data Transfer
    • Загрузка данных в Yandex Managed Service for YDB с помощью Yandex Data Transfer
    • Обмен данными между Yandex Managed Service for ClickHouse® и Yandex Data Processing
    • Загрузка данных из Yandex Managed Service for YDB с помощью Yandex Data Transfer
    • Хостинг статического сайта на фреймворке Gatsby в Object Storage
    • Миграция базы данных из Managed Service for PostgreSQL в Object Storage
    • Обмен данными между Yandex Managed Service for ClickHouse® и Yandex Data Processing
    • Импорт данных из Yandex Managed Service for PostgreSQL в Yandex Data Processing с помощью Sqoop
    • Импорт данных из Yandex Managed Service for MySQL® в Yandex Data Processing с помощью Sqoop
    • Миграция данных из Yandex Object Storage в Yandex Managed Service for MySQL® с помощью Yandex Data Transfer
    • Миграция базы данных из Yandex Managed Service for MySQL® в Yandex Object Storage
    • Выгрузка данных Greenplum® в холодное хранилище Yandex Object Storage
    • Загрузка данных из Яндекс Директ в витрину Yandex Managed Service for ClickHouse® с использованием Yandex Cloud Functions, Yandex Object Storage и Yandex Data Transfer
    • Миграция данных из Elasticsearch в Yandex Managed Service for OpenSearch
    • Загрузка состояний Terraform в Object Storage
    • Блокировка состояний Terraform с помощью Managed Service for YDB
    • Визуализация данных Yandex Query
    • Публикация обновлений для игр
    • Резервное копирование ВМ с помощью Хайстекс Акура
    • Резервное копирование в Object Storage с помощью CloudBerry Desktop Backup
    • Резервное копирование в Object Storage через Duplicati
    • Резервное копирование в Object Storage с помощью Bacula
    • Резервное копирование в Object Storage с помощью Veeam Backup
    • Резервное копирование в Object Storage с помощью Veritas Backup Exec
    • Резервное копирование кластера Managed Service for Kubernetes в Object Storage
    • Разработка пользовательской интеграции в API Gateway
    • Сокращатель ссылок
    • Хранение журналов работы приложения
    • Разработка навыка Алисы и сайта с авторизацией
    • Создание интерактивного serverless-приложения с использованием WebSocket
    • Развертывание веб-приложения с использованием Java Servlet API
    • Разработка Telegram-бота
    • Репликация логов в Object Storage с помощью Fluent Bit
    • Репликация логов в Object Storage с помощью Data Streams
    • Загрузка аудитных логов в SIEM ArcSight
    • Загрузка аудитных логов в SIEM Splunk
    • Создание сервера MLFlow для логирования экспериментов и артефактов
    • Работа с данными с помощью Yandex Query
    • Федеративные запросы к данным с помощью Query
    • Распознавание архива изображений в Vision OCR
    • Конвертация видео в GIF на Python
    • Автоматизация задач с помощью Managed Service for Apache Airflow™
    • Обработка файлов детализации в сервисе Yandex Cloud Billing
    • Развертывание веб-приложения с JWT-авторизацией в API Gateway и аутентификацией в Firebase
    • Поиск событий Yandex Cloud в Yandex Query
    • Поиск событий Yandex Cloud в Object Storage
    • Создание внешней таблицы на базе таблицы из бакета с помощью конфигурационного файла
    • Миграция базы данных из Google BigQuery в Managed Service for ClickHouse®
  • Справочник Terraform
  • Метрики Monitoring
  • Аудитные логи Audit Trails
  • Логи бакета
  • История изменений
  • Вопросы и ответы
  • Обучающие курсы

В этой статье:

  • DistCp
  • Доступ в S3 с аутентификацией через IAM-токен сервисного аккаунта кластера
  • Копирование с использованием CredentialProvider
  • Копирование файлов с передачей ключей в аргументах
  • Оптимизация чтения файлов из Object Storage
  • Оптимизация записи файлов в Object Storage
  • Использование коммиттеров S3A
  • Настройки Apache Hadoop
  • Настройки Apache Spark
  • Использование s3fs
  • Использование Object Storage в Spark
  1. Практические руководства
  2. Использование Object Storage в Yandex Data Processing

Использование Object Storage в Yandex Data Processing

Статья создана
Yandex Cloud
Обновлена 24 апреля 2025 г.
  • DistCp
    • Доступ в S3 с аутентификацией через IAM-токен сервисного аккаунта кластера
    • Копирование с использованием CredentialProvider
    • Копирование файлов с передачей ключей в аргументах
  • Оптимизация чтения файлов из Object Storage
  • Оптимизация записи файлов в Object Storage
    • Использование коммиттеров S3A
    • Настройки Apache Hadoop
    • Настройки Apache Spark
  • Использование s3fs
  • Использование Object Storage в Spark

В этом разделе рассмотрены различные способы доступа к объектам из бакетов Object Storage для процессов, запущенных на кластерах Yandex Data Processing.

Примечание

Настройте сеть кластера перед настройкой доступа к сервисам Yandex Cloud и интернет-ресурсам.

На скорость чтения и записи файлов в бакеты влияют настройки компонентов:

  • Настройки, заданные при создании кластера, влияют на все запущенные в кластере задания.
  • Настройки, заданные при создании заданий, переопределяют настройки уровня кластера и могут быть индивидуальными для каждого задания.

DistCpDistCp

Для копирования файлов из Object Storage в HDFS используйте утилиту DistCp. Она предназначена для копирования данных как внутри кластера, так и между кластерами и внешними хранилищами.

Для аутентификации в Object Storage можно использовать один из подходов:

  1. Использовать IAM-токен сервисного аккаунта кластера.
  2. Использовать CredentialProvider.
  3. Передавать параметры access key и secret key статических ключей доступа при создании задачи.

Доступ в S3 с аутентификацией через IAM-токен сервисного аккаунта кластераДоступ в S3 с аутентификацией через IAM-токен сервисного аккаунта кластера

  1. При создании кластера укажите сервисный аккаунт. Если кластер уже создан, добавьте сервисный аккаунт с помощью кнопки Редактировать в консоли управления.

    Сервисному аккаунту должны быть назначены роли:

    • dataproc.agent — чтобы сервисный аккаунт мог получать информацию о состоянии хостов кластера, заданиях и лог-группах.
    • dataproc.provisioner — чтобы сервисный аккаунт мог взаимодействовать с автоматически масштабируемой группой ВМ. Тогда будет доступно автомасштабирование подкластеров.
  2. У сервисного аккаунта должен быть доступ к нужному бакету. Для этого выдайте сервисному аккаунту права в ACL бакета, либо роль storage.viewer или storage.editor.

    Подробнее про эти роли см. в документации Object Storage.

Например, получите список файлов, находящихся в публичном бакете yc-mdb-examples по пути dataproc/example01/set01. Для этого подключитесь к кластеру и выполните команду:

hadoop fs -ls s3a://yc-mdb-examples/dataproc/example01/set01

Результат:

Found 12 items
-rw-rw-rw-   1 root root   19327838 2019-09-13 17:17 s3a://yc-mdb-examples/dataproc/example01/set01/On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_2018_1.parquet
-rw-rw-rw-   1 root root   21120204 2019-09-13 17:17 s3a://yc-mdb-examples/dataproc/example01/set01/On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_2018_10.parquet
-rw-rw-rw-   1 root root   20227757 2019-09-13 17:17 s3a://yc-mdb-examples/dataproc/example01/set01/
...

Копирование с использованием CredentialProviderКопирование с использованием CredentialProvider

Чтобы воспользоваться провайдером для хранения секретов, разместите эти секреты в компонентах, которым нужен доступ к Object Storage. Для этого можно воспользоваться JCEKS (Java Cryptography Extension KeyStore).
В примере вы создадите файл с секретами, который затем разместите в HDFS:

  1. Укажите статический и секретный ключи, например:

    hadoop credential create fs.s3a.access.key \
           -value <статический_ключ> \
           -provider localjceks://file/home/jack/yc.jceks && \
    hadoop credential create fs.s3a.secret.key \
           -value <секретный_ключ> \
           -provider localjceks://file/home/jack/yc.jceks
    
  2. Перенесите файл секрета в локальный HDFS:

    hdfs dfs -put /home/jack/yc.jceks /user/root/
    
  3. Скопируйте файл из Object Storage непосредственно в HDFS:

    hadoop distcp \
           -D fs.s3a.bucket.dataproc-examples.endpoint=storage.yandexcloud.net \
           -D hadoop.security.credential.provider.path=jceks://hdfs/user/root/yc.jceks \
           -update \
           -skipcrccheck \
           -numListstatusThreads 10 \
           s3a://yc-mdb-examples/dataproc/example01/set01 \
           hdfs://<хост_HDFS>/<путь>/
    

    <хост_HDFS> — целевой сервер HDFS, который вы используете. Сервер по умолчанию можно получить с помощью команды:

    hdfs getconf -confKey fs.defaultFS
    

Пример команды копирования файлов из бакета:

hadoop distcp \
       -D fs.s3a.bucket.dataproc-examples.endpoint=storage.yandexcloud.net \
       -D hadoop.security.credential.provider.path=jceks://hdfs/user/root/yc.jceks \
       -update \
       -skipcrccheck \
       -numListstatusThreads 10 \
       s3a://yc-mdb-examples/dataproc/example01/set01 \
       hdfs://rc1b-dataproc-m-d31bs470ivkyrz60.mdb.yandexcloud.net/user/root/datasets/set01/

Копирование файлов с передачей ключей в аргументахКопирование файлов с передачей ключей в аргументах

Вы можете не создавать файл секретов, а передавать ключи в аргументах команды:

hadoop distcp \
       -D fs.s3a.bucket.dataproc-examples.endpoint=storage.yandexcloud.net \
       -D fs.s3a.bucket.dataproc-examples.access.key=<статический_ключ> \
       -D fs.s3a.bucket.dataproc-examples.secret.key=<секретный_ключ> \
       -update \
       -skipcrccheck \
       -numListstatusThreads 10 \
       s3a://yc-mdb-examples/dataproc/example01/set01 \
       hdfs://rc1b-dataproc-m-d31bs470ivkyrz60.mdb.yandexcloud.net/user/root/datasets/set01/

Оптимизация чтения файлов из Object StorageОптимизация чтения файлов из Object Storage

Способ чтения данных из бакета определяется настройкой fs.s3a.experimental.input.fadvise. Ее значение зависит от версии используемого образа:

  • В образах версий 1.0—1.4 по умолчанию используется значение sequential. Оно подходит для операций последовательного чтения файлов, но для произвольного работает медленно. Если вы чаще используете произвольный доступ к файлам, добавьте в свойства компонентов кластера или укажите в настройках задания значение random.
  • В образе версии 2.0 по умолчанию используется значение normal: работа с файлами происходит в последовательном режиме, но если приложение выполняет операции произвольного доступа, режим автоматически переключается на random.

Подробнее об используемых версиях компонентов см. в разделе Среда исполнения.

Оптимизация записи файлов в Object StorageОптимизация записи файлов в Object Storage

Чтобы увеличить скорость записи файлов в Object Storage, вы можете:

  • использовать коммиттеры S3A;
  • задать настройки Apache Hadoop;
  • задать настройки Apache Spark.

Использование коммиттеров S3AИспользование коммиттеров S3A

Коммиттеры S3A — входящий в состав Apache Hadoop набор программных модулей для записи данных в объектное хранилище по протоколу S3, обеспечивающий эффективное и приближенное к атомарному подтверждение выполненных изменений. Подробнее см. в документации Apache Hadoop и Apache Spark.

Примечание

Коммиттеры S3A не используются и не требуются для работы с таблицами, управляемыми средствами библиотеки DeltaLake, которая реализует собственную логику для работы с данными в объектном хранилище.

Есть три основных режима работы коммиттеров S3A:

Режим Среда Необходим HDFS Запись в партиционированные
таблицы
Скорость записи
directory MapReduce, Spark Да* Полная перезапись Обычная
magic MapReduce, Spark Нет (запись напрямую в S3) Не поддерживается Максимальная
partitioned Spark Да* Замена и дополнение партиций Обычная

* В режимах directory и partitioned не производится проверка на фактическое наличие HDFS для хранения промежуточных данных. При этом часть заданий могут успешно отрабатывать без HDFS, однако в сложных заданиях могут возникнуть проблемы, проявляющиеся в виде ошибок «файл не найден» или неполной записи результатов задания в Object Storage.

Чтобы включить коммиттеры S3A, задайте значения следующих настроек:

  • core:fs.s3a.committer.magic.enabled : true, если задания будут использовать режим magic.
  • core:fs.s3a.committer.name — используемый режим по умолчанию: directory, magic или partitioned.
  • core:fs.s3a.committer.staging.abort.pending.uploads : false для Hadoop 3.2.2 в составе образа Yandex Data Processing версии 2.0 или core:fs.s3a.committer.abort.pending.uploads : false для Hadoop 3.3.2 в составе образа 2.1, если несколько параллельно работающих заданий выполняют запись в одну и ту же таблицу.
  • core:mapreduce.outputcommitter.factory.scheme.s3a : org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory.
  • spark:spark.hadoop.fs.s3a.committer.name — используемый режим по умолчанию: directory, magic или partitioned.
  • spark:spark.sql.parquet.output.committer.class : org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter.
  • spark:spark.sql.sources.commitProtocolClass : org.apache.spark.internal.io.cloud.PathOutputCommitProtocol.
  • (опционально) core:fs.s3a.committer.staging.conflict-mode — действие при обнаружении в целевой таблице уже существующих партиций с данными (при использовании режима partitioned):
    • append — данные в существующей партиции дополняются новыми данными.
    • fail — при попытке перезаписи существующей партиции задание останавливается с ошибкой.
    • replace — данные в существующей партиции заменяются данными новой партиции.

Используемый режим работы коммиттеров S3A может переопределяться для конкретного задания путем установки настроек fs.s3a.committer.name и spark.hadoop.fs.s3a.committer.name в необходимое значение (directory, magic или partitioned).

Не следует менять значение по умолчанию для настройки spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version, поскольку Yandex Object Storage не поддерживает атомарные переименования каталогов.

Настройки Apache HadoopНастройки Apache Hadoop

Способ записи данных в бакет Object Storage определяется настройкой core:fs.s3a.fast.upload. Ее значение зависит от версии используемого образа:

  • В образах версий 1.0—1.4 по умолчанию используется значение false для экономии RAM. Укажите для этой настройки значение true в свойствах компонентов кластера или настройках задания. Это ускорит запись в бакет больших файлов и предотвратит переполнение хранилищ узлов.
  • В образе версии 2.0 настройка fs.s3a.fast.upload включена по умолчанию.

При необходимости укажите значения других настроек, отвечающих за режим записи в Object Storage:

  • fs.s3a.committer.threads — количество потоков, выполняющих фиксацию изменений в Object Storage в конце работы задания.
  • fs.s3a.connection.maximum — количество разрешенных соединений с Object Storage.
  • fs.s3a.connection.timeout — максимальное время ожидания соединения с Object Storage в миллисекундах.
  • fs.s3a.fast.upload.active.blocks — максимальное количество блоков в одном потоке вывода.
  • fs.s3a.fast.upload.buffer — тип буфера, используемого для временного хранения загружаемых данных:
    • disk — данные сохраняются в каталог, указанный в настройке fs.s3a.buffer.dir;
    • array — используются массивы в куче JVM;
    • bytebuffer — используется RAM вне кучи JVM.
  • fs.s3a.max.total.tasks — размер очереди операций над бакетом Object Storage, которые не могут быть запущены из-за исчерпания рабочих потоков.
  • fs.s3a.multipart.size — размер кусков (chunk) в байтах, на которые будут разбиты данные при копировании или выгрузке в бакет.
  • fs.s3a.threads.max — количество рабочих потоков в менеджере загрузок (AWS Transfer Manager).

Примечание

Большие значения этих параметров могут привести к увеличению потребления вычислительных ресурсов на хостах кластера Yandex Data Processing.

Подробнее см. в документации Apache Hadoop и разделе Свойства компонентов.

Настройки Apache SparkНастройки Apache Spark

При доступе к данным в Object Storage из заданий Spark рекомендовано использовать значение true для настройки spark.sql.hive.metastorePartitionPruning.

При работе с данными в формате Parquet в заданиях Spark рекомендованы настройки:

  • spark.hadoop.parquet.enable.summary-metadata : false
  • spark.sql.parquet.mergeSchema : false
  • spark.sql.parquet.filterPushdown : true

При работе с данными в формате Parquet и использовании динамической перезаписи партиций рекомендованы настройки:

  • spark:spark.sql.sources.partitionOverwriteMode : dynamic
  • spark:spark.sql.parquet.output.committer.class : org.apache.parquet.hadoop.ParquetOutputCommitter
  • spark:spark.sql.sources.commitProtocolClass : org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol

При работе с данными в формате Orc в заданиях Spark рекомендованы настройки:

  • spark.sql.orc.cache.stripe.details.size : 10000
  • spark.sql.orc.filterPushdown : true
  • spark.sql.orc.splits.include.file.footer : true

Задания, создающие или обновляющие большое количество (сотни и тысячи) партиций в таблицах, могут тратить много времени на актуализацию записей о партициях в кластере Hive Metastore. Для ускорения этого процесса увеличьте значения следующих настроек:

  • hive:datanucleus.connectionPool.maxPoolSize — максимальный размер пула соединений к БД Metastore.
  • hive:hive.metastore.fshandler.threads — количество рабочих потоков, выполняющих фоновые операции с файловой системой Metastore.
  • spark:spark.sql.addPartitionInBatch.size — количество партиций, актуализируемых за один вызов Metastore. Оптимальное значение — 10 × <значение_настройки_hive:hive.metastore.fshandler.threads> или выше.

Примечание

Чрезмерно большие значения перечисленных параметров могут привести к исчерпанию системных ресурсов Metastore. Большой размер пула соединений к БД Metastore может потребовать изменения настроек и увеличения объема вычислительных ресурсов кластера.

Подробнее см. в документации Apache Spark и разделе Свойства компонентов.

Использование s3fsИспользование s3fs

s3fs позволяет монтировать бакеты Object Storage посредством Fuse. Более подробно об использовании утилиты можно узнать на странице s3fs.

Использование Object Storage в SparkИспользование Object Storage в Spark

Spark Shell
PySpark Shell

Реализуйте нужный вариант доступа:

  • С использованием JCEKS:

    sc.hadoopConfiguration.set("fs.s3a.endpoint", "storage.yandexcloud.net");
    sc.hadoopConfiguration.set("fs.s3a.signing-algorithm", "");
    sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
    sc.hadoopConfiguration.set("hadoop.security.credential.provider.path", "jceks://hdfs/<путь_к_файлу_JCEKS>");
    
  • По ключу доступа и секрету:

    sc.hadoopConfiguration.set("fs.s3a.endpoint", "storage.yandexcloud.net");
    sc.hadoopConfiguration.set("fs.s3a.signing-algorithm", "");
    sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
    sc.hadoopConfiguration.set("fs.s3a.access.key","<ключ_доступа>");
    sc.hadoopConfiguration.set("fs.s3a.secret.key","<секрет_бакета>");
    

После этого можно читать файл из Object Storage:

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.parquet("s3a://<имя_бакета>/<путь_к_объекту>")

Выберите способ доступа:

  • Доступ к объектам Object Storage c использованием JCEKS:

    sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "storage.yandexcloud.net")
    sc._jsc.hadoopConfiguration().set("fs.s3a.signing-algorithm", "")
    sc._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
    sc._jsc.hadoopConfiguration().set("hadoop.security.credential.provider.path", "jceks://hdfs/<путь_к_файлу_JCEKS>")
    
  • Доступ по ключу доступа и секрету бакета:

    sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "storage.yandexcloud.net")
    sc._jsc.hadoopConfiguration().set("fs.s3a.signing-algorithm", "")
    sc._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
    sc._jsc.hadoopConfiguration().set("fs.s3a.access.key","<ключ_доступа>")
    sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key","<секрет_бакета>")
    

Получив доступ, вы можете читать файл напрямую из Object Storage:

from pyspark.sql import SQLContext

sql = SQLContext(sc)
df = sql.read.parquet("s3a://<имя_бакета>/<путь_к_объекту>")

Подробнее см. на странице Настройки Spark для работы с Yandex Object Storage.

Была ли статья полезна?

Предыдущая
Монтирование бакетов к файловой системе хостов Yandex Data Processing
Следующая
Импорт данных из Object Storage, обработка и экспорт в Managed Service for ClickHouse®
Проект Яндекса
© 2025 ООО «Яндекс.Облако»