Apache Hudi supports Flink engine

Apache Hudi is a data lake framework developed and open sourced by Uber. It entered the Apache incubator in January 2019 and successfully graduated and promoted to the top Apache project in May of the following year. It is one of the most popular data lake frameworks.

1. Why decoupling

Hudi has been using Spark as its data processing engine since its birth. If users want to use Hudi as their data lake framework, they must introduce Spark into their platform technology stack. A few years ago, using Spark as a big data processing engine can be said to be very common and even natural. Because Spark can either perform batch processing or use micro-batch to simulate streaming, stream-batch integration, and a set of engines to solve streaming and batch problems. However, in recent years, with the development of big data technology, Flink, which is also a big data processing engine, has gradually entered people’s field of vision, and has occupied a certain market in the field of computing engines. Big data processing engines are no longer the only one. In the big data technology community, forums and other territories, the voice of whether Hudi supports the use of the flink computing engine has gradually appeared and has become more frequent. So enabling Hudi to support the Flink engine is a valuable thing, and the premise of integrating the Flink engine is the decoupling of Hudi and Spark.

At the same time, looking at the mature, active, and vigorous frameworks in the big data field, all are elegantly designed, can be integrated with other frameworks, and leverage each other’s expertise. Therefore, decoupling Hudi from Spark and turning it into an engine-independent data lake framework will undoubtedly create more possibilities for the integration of Hudi and other components, allowing Hudi to better integrate into the big data ecosystem.

2. Decoupling difficulties

Hudi internal use of Spark API is as common as our usual development and use of List. Since the data source reads the data, and finally writes the data to the table, Spark RDD is used as the main data structure everywhere, and even common tools are implemented using the Spark API. It can be said that Hudi is a universal data implemented by Spark. Lake framework, its binding with Spark can be said to be deeply rooted.

In addition, the primary engine integrated after this decoupling is Flink. Flink and Spark differ greatly in core abstraction. Spark believes that data is bounded, and its core abstraction is a limited set of data. Flink believes that the essence of data is a stream, and its core abstract DataStream contains various operations on data. At the same time, there are multiple RDDs operating at the same time in Hudi, and the processing results of one RDD are combined with another RDD. This abstraction difference and the reuse of intermediate results during implementation make Hudi’s solution It is difficult to use a unified API to operate RDD and DataStream at the same time.

3. Decoupling ideas

In theory, Hudi uses Spark as its computing engine to use Spark’s distributed computing power and RDD’s rich operator capabilities. Apart from distributed computing power, Hudi abstracts RDD more as a data structure, and RDD is essentially a bounded data set. Therefore, it is completely feasible to replace RDD with List in theory (of course, it may be Sacrifice some performance). In order to ensure the performance and stability of the Hudi Spark version as much as possible. We can keep the setting of bounded data set as the basic unit of operation. Hudi’s main operation API remains unchanged, and RDD is extracted as a generic. Spark engine implementation still uses RDD, and other engines use List or other bounded according to the actual situation. data set.

Principle of decoupling

1) Unified generics. JavaRDD<HoodieRecord>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus> used in Spark API use generic I, K, O instead;

2) De-Sparkization. All APIs of the abstraction layer must have nothing to do with Spark. Involving specific operations that are difficult to implement in the abstract layer, rewrite them as abstract methods and introduce Spark subclasses.

For example: Hudi uses the JavaSparkContext#map method in many places. To de-Spark, you need to hide the JavaSparkContext. To solve this problem, we introduce the HoodieEngineContext#map method, which will shield the specific implementation details of the map, so that it can be implemented in abstract Sparkization.

3) Minimize changes to the abstraction layer to ensure the functionality and performance of the original Hudi;

4) Use the HoodieEngineContext abstract class to replace JavaSparkContext to provide the running environment context.

4. Flink integrated design

Hudi’s write operation is batch processing in nature, and the continuous mode of DeltaStreamer is realized by looping batch processing. In order to use a unified API, when Hudi integrates Flink, it chooses to collect a batch of data before processing, and finally submit it in a unified manner (here, Flink uses List to collect data).

The easiest way to think of batch operation is to use a time window. However, with a window, when there is no data flowing in a window, there will be no output data, and it is difficult for the sink to judge whether the same batch of data has been processed. Therefore, we use Flink’s checkpoint mechanism to collect batches. The data between every two barriers is a batch. When there is no data in a subtask, the mock result data is made up. In this way, on the sink side, when each subtask has result data issued, it can be considered that a batch of data has been processed and the commit can be executed.

Image for post
Image for post
  • source receives Kafka data and converts it into List<HoodieRecord>;
  • InstantGeneratorOperator generates a globally unique instant. When the previous instant is not completed or the current batch has no data, no new instant is created;
  • KeyBy partitionPath partitions according to partitionPath, avoiding multiple subtasks to write the same partition;
  • WriteProcessOperator performs a write operation. When there is no data in the current partition, it sends empty result data to the downstream to make up the number;
  • CommitSink receives the calculation results of the upstream task. When receiving parallelism results, it is considered that all the upstream subtasks have been executed and the commit is executed.
  • Source receives Kafka data and converts it into List<HoodieRecord>;

InstantGeneratorOperator generates a globally unique instant. When the previous instant is not completed or the current batch has no data, no new instant is created;

KeyBy partitionPath partitions according to partitionPath, avoiding multiple subtasks to write the same partition;

WriteProcessOperator performs a write operation. When there is no data in the current partition, it sends empty result data to the downstream to make up the number;

CommitSink receives the calculation results of the upstream task. When receiving parallelism results, it is considered that all the upstream subtasks have been executed and the commit is executed.

Note: InstantGeneratorOperator and WriteProcessOperator are both custom Flink operators. InstantGeneratorOperator will block checking the state of the previous instant to ensure that there is only one instant in the global (or requested) state. WriteProcessOperator is the place where the write operation is actually performed. The write operation is triggered at checkpoint.

5. Implementation example 1) HoodieTable /**

* Abstract implementation of a HoodieTable.

*

* @param <T> Sub type of HoodieRecordPayload

* @param <I> Type of inputs

* @param <K> Type of keys

* @param <O> Type of outputs

*/

public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implements Serializable {

protected final HoodieWriteConfig config;

protected final HoodieTableMetaClient metaClient;

protected final HoodieIndex<T, I, K, O> index;

public abstract HoodieWriteMetadata<O> upsert(HoodieEngineContext context, String instantTime,

I records);

public abstract HoodieWriteMetadata<O> insert(HoodieEngineContext context, String instantTime,

I records);

public abstract HoodieWriteMetadata<O> bulkInsert(HoodieEngineContext context, String instantTime,

I records, Option<BulkInsertPartitioner<I>> bulkInsertPartitioner);

……

}

HoodieTable is one of the core abstractions of hudi, which defines the insert, upsert, bulkInsert and other operations supported by the table. Take upsert as an example, the input data is changed from the original JavaRDD<HoodieRecord> inputRdds to I records, and the runtime JavaSparkContext jsc is changed to HoodieEngineContext context.

From the class annotations, we can see that T, I, K, and O represent the load data type, input data type, primary key type, and output data type of the hudi operation, respectively. These generics will run through the entire abstraction layer.

2) HoodieEngineContext /**

* Base class contains the context information needed by the engine at runtime. It will be extended by different

* engine implementation if needed.

*/

public abstract class HoodieEngineContext {

public abstract <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism);

public abstract <I, O> List<O> flatMap(List<I> data, SerializableFunction<I, Stream<O>> func, int parallelism);

public abstract <I> void foreach(List<I> data, SerializableConsumer<I> consumer, int parallelism);

}

HoodieEngineContext plays the role of JavaSparkContext. It not only provides all the information that JavaSparkContext can provide, but also encapsulates many methods such as map, flatMap, and foreach, and hides the specific implementation of JavaSparkContext#map, JavaSparkContext#flatMap, JavaSparkContext#foreach and other methods.

Take the map method as an example. In the implementation class of Spark HoodieSparkEngineContext, the map method is as follows:

@Override

public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism) {

return javaSparkContext.parallelize(data, parallelism).map(func::apply).collect;

}

In the engine that operates List, the implementation can be as follows (different methods need to pay attention to thread safety issues, use parallel with caution):

@Override

public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism) {

return data.stream.parallel.map(func::apply).collect(Collectors.toList);

}

Note: The exception thrown in the map function can be solved by wrapping the SerializableFunction<I, O> func.

Here is a brief introduction to SerializableFunction:

@FunctionalInterface

public interface SerializableFunction<I, O> extends Serializable {

O apply(I v1) throws Exception;

}

This method is actually a variant of java.util.function.Function. The difference from java.util.function.Function is that SerializableFunction can be serialized and can throw exceptions. This function is introduced because the input parameters that the JavaSparkContext#map function can receive must be sequenceable. At the same time, there are many exceptions to be thrown in the logic of Hudi, and the try catch code in the Lambda expression is slightly bloated and not elegant .

Written by

Digital Nomad

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store