Storm-HDFS: Core Storm APIs
The primary classes of the storm-hdfs connector are
HdfsBolt and SequenceFileBolt, both located in the
org.apache.storm.hdfs.bolt package. Use the HdfsBolt
class to write text data to HDFS and the SequenceFileBolt class to write
binary data.
For more information about the HdfsBolt class, refer to the Apache
Storm HdfsBolt documentation.
Specify the following information when instantiating the bolt:
HdfsBolt Methods
withFsUrlSpecifies the target HDFS URL and port number.
withRecordFormatSpecifies the delimiter that indicates a boundary between data records. Storm developers can customize by writing their own implementation of the
org.apache.storm.hdfs.format.RecordFormatinterface. Use the providedorg.apache.storm.hdfs.format. DelimitedRecordFormatclass as a convenience class for writing delimited text data with delimiters such as tabs, comma-separated values, and pipes. Thestorm-hdfsbolt uses theRecordFormatimplementation to convert tuples to byte arrays, so this method can be used with both text and binary data.withRotationPolicySpecifies when to stop writing to a data file and begin writing to another. Storm developers can customize by writing their own implementation of the
org.apache.storm.hdfs.rotation.FileSizeRotationSizePolicyinterface.withSyncPolicySpecifies how frequently to flush buffered data to the HDFS filesystem. This action enables other HDFS clients to read the synchronized data, even as the Storm client continues to write data. Storm developers can customize by writing their own implementation of the
org.apache.storm.hdfs.sync.SyncPolicyinterface.withFileNameFormatSpecifies the name of the data file. Storm developers can customize by writing their own interface of the
org.apache.storm.hdfs.format.FileNameFormatinterface. The providedorg.apache.storm.hdfs.format.DefaultFileNameFormatcreates file names with the following naming format:{prefix}-{componentId}-{taskId}-{rotationNum}-{timestamp}-{extension}.Example:
MyBolt-5-7-1390579837830.txt.
Example: Cluster Without High Availability ("HA")
The following example writes pipe-delimited files to the HDFS path
hdfs://localhost:8020/foo. After every 1,000 tuples it will synchronize with
the filesystem, making the data visible to other HDFS clients. It will rotate the files when
they reach 5 MB in size.
Note that the HdfsBolt is instantiated with an HDFS URL and port number.
```java
// use "|" instead of "," for field delimiter
RecordFormat format = new DelimitedRecordFormat()
.withFieldDelimiter("|");
// Synchronize the filesystem after every 1000 tuples
SyncPolicy syncPolicy = new CountSyncPolicy(1000);
// Rotate data files when they reach 5 MB
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
// Use default, Storm-generated file names
FileNameFormat fileNameFormat = new DefaultFileNameFormat()
.withPath("/foo/");
// Instantiate the HdfsBolt
HdfsBolt bolt = new HdfsBolt()
.withFsUrl("hdfs://localhost:8020")
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy);
``` Example: HA-Enabled Cluster
The following example shows how to modify the previous example for an HA-enabled cluster.
Here the HdfsBolt is instantiated with a nameservice ID, instead of using an HDFS URL and port number.
...
HdfsBolt bolt = new HdfsBolt()
.withFsURL("hdfs://myNameserviceID")
.withFileNameFormat(fileNameformat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy);
...To obtain the nameservice ID, check the dfs.nameservices property in your
hdfs-site.xml file; nnha in the following example:
<property> <name>dfs.nameservices</name> <value>nnha</value> </property>

