Using Object Storage in Yandex Data Processing
This section describes the ways processes running in Yandex Data Processing clusters can access objects in Object Storage buckets.
Note
Configure a cluster network before setting up access to the Yandex Cloud services and internet resources.
Component settings impact bucket file read and write speed:
- The settings specified when creating a cluster affect all the jobs running in the cluster.
- The settings specified when creating jobs override cluster-level settings and can be job-specific.
DistCp
To copy files from Object Storage to HDFS, use DistCp
To authenticate in Object Storage:
- Use the IAM token of the cluster service account.
- Use CredentialProvider
. - Provide the
access keyandsecret keyparameters of static access keys when creating jobs.
Accessing S3 with authentication via the IAM token of a cluster service account
-
When creating a cluster, specify a service account. If you already have the cluster, add a service account using the Edit button in the management console.
Make sure to assign the following roles to the service account:
- dataproc.agent: To enable the service account to get info on cluster host states, jobs, and log groups.
- dataproc.provisioner: To enable the service account to work with an autoscaling instance group. This will enable subcluster autoscaling.
-
The service account must have access to the relevant bucket. Therefore, grant the service account permissions in the bucket ACL, or the
storage.viewerorstorage.editorrole.For more information about these roles, see the Object Storage documentation.
For example, to get a list of files located in the
yc-mdb-examplespublic bucket atdataproc/example01/set01you need to connect to the cluster and run the command:hadoop fs -ls s3a://yc-mdb-examples/dataproc/example01/set01Result:
Found 12 items -rw-rw-rw- 1 root root 19327838 2019-09-13 17:17 s3a://yc-mdb-examples/dataproc/example01/set01/On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_2018_1.parquet -rw-rw-rw- 1 root root 21120204 2019-09-13 17:17 s3a://yc-mdb-examples/dataproc/example01/set01/On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_2018_10.parquet -rw-rw-rw- 1 root root 20227757 2019-09-13 17:17 s3a://yc-mdb-examples/dataproc/example01/set01/ ...
Copying via CredentialProvider
To use a secret storage provider, place the secrets within the components that need access to Object Storage. To do this, you can use JCEKS
In this example, you first create a file with secrets and then place it in HDFS:
-
Specify the static and secret keys, e.g.:
hadoop credential create fs.s3a.access.key \ -value <static_key> \ -provider localjceks://file/home/jack/yc.jceks && \ hadoop credential create fs.s3a.secret.key \ -value <secret_key> \ -provider localjceks://file/home/jack/yc.jceks -
Copy the secrets file to your local HDFS:
hdfs dfs -put /home/jack/yc.jceks /user/root/ -
Copy the file from Object Storage directly to HDFS:
hadoop distcp \ -D fs.s3a.bucket.dataproc-examples.endpoint=storage.yandexcloud.net \ -D hadoop.security.credential.provider.path=jceks://hdfs/user/root/yc.jceks \ -update \ -skipcrccheck \ -numListstatusThreads 10 \ s3a://yc-mdb-examples/dataproc/example01/set01 \ hdfs://<HDFS_host>/<path>/<HDFS_host>: Your target HDFS server. You can get the default server using this command:hdfs getconf -confKey fs.defaultFS
Example of the command to copy files from the bucket:
hadoop distcp \
-D fs.s3a.bucket.dataproc-examples.endpoint=storage.yandexcloud.net \
-D hadoop.security.credential.provider.path=jceks://hdfs/user/root/yc.jceks \
-update \
-skipcrccheck \
-numListstatusThreads 10 \
s3a://yc-mdb-examples/dataproc/example01/set01 \
hdfs://rc1b-dataproc-m-d31bs470ivkyrz60.mdb.yandexcloud.net/user/root/datasets/set01/
Copying files by providing keys in arguments
Instead of creating a secrets file, you can provide keys in the command arguments:
hadoop distcp \
-D fs.s3a.bucket.dataproc-examples.endpoint=storage.yandexcloud.net \
-D fs.s3a.bucket.dataproc-examples.access.key=<static_key> \
-D fs.s3a.bucket.dataproc-examples.secret.key=<secret_key> \
-update \
-skipcrccheck \
-numListstatusThreads 10 \
s3a://yc-mdb-examples/dataproc/example01/set01 \
hdfs://rc1b-dataproc-m-d31bs470ivkyrz60.mdb.yandexcloud.net/user/root/datasets/set01/
Optimizing file reads from Object Storage
The fs.s3a.experimental.input.fadvise setting
- Image versions
1.0through1.4usesequentialas the default value. It is a good choice for sequential file reads, but slow for random access. If you use random file access more frequently, addrandomto the cluster component properties or job settings. - For version
2.0images, the default isnormal: file reads are sequential; however, if an application accesses data randomly, therandommode activates automatically.
For more information on the component versions used, see Runtime environment.
Optimizing file writes to Object Storage
To speed up file writes to Object Storage, you can:
Using S3A committers
S3A committers are Apache Hadoop software modules used for writing data to an object storage over the S3 protocol to ensure efficient and near-atomic commits of the changes made. For more information, see the Apache Hadoop
Note
S3A committers are not used or required for operations with tables managed using the tools of the DeltaLake
S3A committers run in three basic modes:
| Mode | Environment | HDFS required | Writing data to partitionedtables | Write speed |
|---|---|---|---|---|
directory |
MapReduce, Spark | Yes* | Complete overwrite | Standard |
magic |
MapReduce, Spark | No (data writes directly to S3) | Not supported | Maximum |
partitioned |
Spark | Yes* | Replacing and appending partitions | Standard |
* In the directory and partitioned modes, there are no checks for actual presence of HDFS for storing intermediate data. Some jobs may be successfully completed without HDFS. However, this may cause issues with complex jobs, such as file not found errors or incomplete writes of job results to Object Storage.
To enable S3A committers, set the following:
core:fs.s3a.committer.magic.enabled : trueif jobs will use themagicmode.core:fs.s3a.committer.namewithdirectory,magic, orpartitionedas the default mode.core:fs.s3a.committer.staging.abort.pending.uploads : falsefor Hadoop 3.2.2 as part of the Yandex Data Processing image 2.0, orcore:fs.s3a.committer.abort.pending.uploads : falsefor Hadoop 3.3.2 as part of image 2.1 if multiple concurrent jobs are writing data to the same table.core:mapreduce.outputcommitter.factory.scheme.s3a : org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory.spark:spark.hadoop.fs.s3a.committer.namewithdirectory,magic, orpartitionedas the default mode.spark:spark.sql.parquet.output.committer.class : org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter.spark:spark.sql.sources.commitProtocolClass : org.apache.spark.internal.io.cloud.PathOutputCommitProtocol.- Optionally,
core:fs.s3a.committer.staging.conflict-mode: Action if existing data partitions are found in the target table (for thepartitionedmode):append: Append new data to an existing partition.fail: When attempting to overwrite an existing partition, the job terminates with an error.replace: Existing partition data is replaced with new partition data.
The current S3A committer mode may be overridden for a specific job by setting fs.s3a.committer.name and spark.hadoop.fs.s3a.committer.name to directory, magic, or partitioned as required.
Do not change the default value for spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version because Yandex Object Storage does not support atomic folder renames.
Apache Hadoop settings
The core:fs.s3a.fast.upload setting defines data writes to an Object Storage bucket. Its value depends on the image version used:
- Image versions
1.0through1.4usefalseas the default value to save RAM. Set it totruein the cluster component properties or job settings. This will speed up bucket writes of large files and prevent node storages from filling up. - In image
2.0,fs.s3a.fast.uploadis enabled by default.
If required, specify values for other settings
fs.s3a.committer.threads: Number of threads committing changes in Object Storage towards the end of the job.fs.s3a.connection.maximum: Number of allowed Object Storage connections.fs.s3a.connection.timeout: Maximum Object Storage connection timeout in milliseconds.fs.s3a.fast.upload.active.blocks: Maximum number of blocks in a single output thread.fs.s3a.fast.upload.buffer: Type of buffer used for temporary storage of the upload data:disk: Save data to the folder specified infs.s3a.buffer.dir.array: Use arrays in the JVM heap.bytebuffer: Use RAM from outside the JVM heap.
fs.s3a.max.total.tasks: Size of queued Object Storage bucket operations that cannot run due to reaching the thread limit.fs.s3a.multipart.size: Size of data chunks in bytes when copying or uploading to a bucket.fs.s3a.threads.max: Number of threads in the AWS Transfer Manager.
Note
Large values of these parameters may increase the usage of computing resources on Yandex Data Processing cluster hosts.
For more information, see the Apache Hadoop documentation
Apache Spark settings
When accessing data in Object Storage from Spark jobs, we recommend setting spark.sql.hive.metastorePartitionPruning to true.
When working with data in Parquet format, use the following Spark job settings:
spark.hadoop.parquet.enable.summary-metadata : falsespark.sql.parquet.mergeSchema : falsespark.sql.parquet.filterPushdown : true
When working with data in Parquet format and using dynamic partition overwrites, use the following settings:
spark:spark.sql.sources.partitionOverwriteMode : dynamicspark:spark.sql.parquet.output.committer.class : org.apache.parquet.hadoop.ParquetOutputCommitterspark:spark.sql.sources.commitProtocolClass : org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
When working with data in Orc format, use the following Spark job settings:
spark.sql.orc.cache.stripe.details.size : 10000spark.sql.orc.filterPushdown : truespark.sql.orc.splits.include.file.footer : true
It may take a long time for the jobs creating or updating massive numbers (hundreds and thousands) of table partitions to update partition records in the Apache Hive™ Metastore cluster. To speed up this process, increase the following values:
hive:datanucleus.connectionPool.maxPoolSize: Maximum size of the Apache Hive™ Metastore DB connection pool.hive:hive.metastore.fshandler.threads: Number of threads running background operations with the file system within Apache Hive™ Metastore.spark:spark.sql.addPartitionInBatch.size: Number of partitions updated per Apache Hive™ Metastore call. The optimal value is10 × <hive:hive.metastore.fshandler.threads_setting_value>or higher.
Note
If these values are excessively high, you may run out of Apache Hive™ Metastore system resources. A large Apache Hive™ Metastore database connection pool may require updating the settings and ramping up the cluster's computing resources.
For more information, see the Apache Spark documentation
Using s3fs
s3fs allows mounting Object Storage buckets using Fuse. Read more about using s3fs.
Using Object Storage from Spark
Enable the access option you need:
-
Using JCEKS:
sc.hadoopConfiguration.set("fs.s3a.endpoint", "storage.yandexcloud.net"); sc.hadoopConfiguration.set("fs.s3a.signing-algorithm", ""); sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"); sc.hadoopConfiguration.set("hadoop.security.credential.provider.path", "jceks://hdfs/<path_to_JCEKS_file>"); -
Using your access key and secret:
sc.hadoopConfiguration.set("fs.s3a.endpoint", "storage.yandexcloud.net"); sc.hadoopConfiguration.set("fs.s3a.signing-algorithm", ""); sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"); sc.hadoopConfiguration.set("fs.s3a.access.key","<access_key>"); sc.hadoopConfiguration.set("fs.s3a.secret.key","<bucket_secret>");
You can then read the file from Object Storage:
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.parquet("s3a://<bucket_name>/<object_path>")
Select the access method:
-
Accessing the Object Storage objects using JCEKS:
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "storage.yandexcloud.net") sc._jsc.hadoopConfiguration().set("fs.s3a.signing-algorithm", "") sc._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") sc._jsc.hadoopConfiguration().set("hadoop.security.credential.provider.path", "jceks://hdfs/<path_to_JCEKS_file>") -
Reading a file using an access key and bucket secret:
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "storage.yandexcloud.net") sc._jsc.hadoopConfiguration().set("fs.s3a.signing-algorithm", "") sc._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") sc._jsc.hadoopConfiguration().set("fs.s3a.access.key","<access_key>") sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key","<bucket_secret>")
Once you have access, you can read the file directly from Object Storage:
from pyspark.sql import SQLContext
sql = SQLContext(sc)
df = sql.read.parquet("s3a://<bucket_name>/<object_path>")
For more information, see Spark settings for integration with Yandex Object Storage.