gtag('config', 'G-0PFHD683JR');
Price Prediction

Data engineer guide to pyiceberg

Written by: Diptiman Raichaudhuri, Lawyer of the Personnel Developer at Conflunet

This article shows data engineers how to use Pyiceberg, lightweight Python Python Library. Pyicberg facilitates common data tasks such as creating, modifying or deleting data in Apache iceberg, without the need for a large group.

Postering with complex business requirements and the need to analyze larger quantities of information, data platforms have turned significantly over the past few years to help companies extract more ideas and value from various data sources.

As for the use of institutional analyzes, the Lakehouse Open Data platform was at the forefront of this articles of association. Lakehouse Open Data Feans enables data teams to create “configuration” structures within their ecosystem. With this style, the data team can design data platforms with storage layers, account and data governance you choose, to constantly meet the needs of advanced companies. Open table formats (OTF) such as Apache iceberg are engines that lead the modern open lake.

Constitated data platform structureConstitated data platform structure

Constitated data platform structure

Data platforms designed with Apache iceberg usually contain three layers:

  1. Data layerActual data files (usually parquet, Avro or ORC) are usually stored in this layer.
  2. Descriptive data layerInstead of sorting the tables in individual evidence, Iceberg maintains a list of files. The descriptive data layer runs table clips, scheme, and file statistics.
  3. Catalog mountain iceCatalog is the central warehouse that facilitates the discovery of the table and the creation and amendment of tables and ensures the consistency of laboratory to manage ice tables.

This common chart of Apache iceberg documents shows these layers:

Apache ice layers (source)

What is pyiceberg?

Pyiceberg enables analyzes and data engineers to create advanced open lake platforms on a wide range of clouds (AWS, Azure, Google Cloud) and local storage. Pyiceberg is the Python app to access iceberg tables. The developers who use Pyiceberg can use Pythonic data transfers without the need to operate high -performance query engines in Java Virtual Machine (JVM) groups. Pyicberg uses catalogs to download iceberg tables and perform reading and writing. It deals with aspects of descriptive data formatting and table from the iceberg, allowing data engineers to use small, fast and very effective data engines such as Pyarrow (Python implementation of APACHH), Duckdb or Duckdb as a calculation to process actual data ..

Pyiceberg can act as an independent program or on kubernetes groups to tolerate errors. Its original integration with ice catalog protocols such as REST, SQLCATALog, or Aws Glue makes it a common and easy choice to inquire about iceberg tables without the need for JVM/PY4J groups.

Pyicberg production processes often merge the burden of data flow such as table flow, where broadcast data is produced as Apache Kafka topics and envy them as icy tables. This is a powerful tool for bridging the gap between operational data systems and analytical data systems.

Why pyiceberg?

Pyiceberg provides a biton friendly method for running data processing operations (DML) on the ice mountain tables. For small to medium-sized data that works with 100s from Giget of data-such as those that deal with administrations analyzes, internal reports or specialized tools-the ease of use is more important for companies of complex features. If the database (historical and gradual) is not huge, it may seem to publish a full range of the explosion to run the iceberg. This is because these query engines (such as Spark and Flink) depend on Scala or Java programming languages ​​that work on the Java Virtual Machine (JVM) to impose and improve retired and multifaceted multi -forgetful treatment. For Python programmers, this means using PY4J, which allows Python software that works in Python subtitles to reach a dynamic Java object in JVM.

In the next section, let’s build a experimental show with Pyiceberg to understand how Iceberg reading patterns make and write.

How to build Lakehouse data with Pyiceberg, Pyarrow and Duckdb

Let’s practice and build an experimental offer for iOS sensor data with Pisburg. For example, we will use Pyiceberg and Pyarrow to include/UPSET, delete ice data and build a Visual Studio (VS Code) icon.

First, a new virtual environment is created for the biton called “Pyiceberg_Playground”, by running the following command:

$>python -m venv iceberg_playground

Then this guide is opened – “Iceberg_Playground” – “In the VS code where the Pyiceberg project will be hosted. The following image is displayed by Vs Code” a clean menu “.

Then Pyiceberg and other libraries are installed in the virtual environment by running the following orders:

$>source bin/activate

(iceberg_playground)$>pip install pyiceberg daft duckdb sqlalchemy pandas

For example, Pyicberg SQLCatALog, which stores the iceberg table information in the local SQLite database. Iceberg also supports catalogs including REST, HIVE and AWS GLUE.

The .Piciceberg.yaml composition is prepared with the following content, in the project root:

catalog:

 pyarrowtest:

   uri: sqlite:///pyiceberg_catalog/pyarrow_catalog.db

   warehouse: file:////Users/diptimanraichaudhuri/testing_space/iceberg_playground/dw1

Note how ICEberg is stored within the Pyiceberg_catalog guide as a SQLite file and in the data warehouse, which stores all the data and descriptive data in the DW1 guide.

These evidence is now created within the project root level. This catalog is called Pyarrowtest.

Next, the Pyiceberg setting is examined using the following text program:

import os

from pyiceberg.catalog import load_catalog

os.environ["PYICEBERG_HOME"] = os.getcwd()

catalog = load_catalog(name='pyarrowtest')

print(catalog.properties)

Notice how Pyiceberg reads the catalog name from the Yaml file and creates a local SQLite database in the Pyiceberg_catalog guide. Since SQLite is distributed with Python stabilizers, it does not have to be fixed separately.

If the text program is running correctly, the “pyarrow_catalog” properties should be displayed at the station.

The text program has downloaded the catalog from the Yaml file, where the “Pyicebeg_home” environment variable was determined as a project root.

Next, a scheme is added using the Pyiceberg chart class. Since this example stores data from a group of Internet of Things sensors, the scheme is created with three columns and the required data types. The DEVICE_ID field is set both as a key key and a section key.

After that, the name space is created with the ice mountain table with the scheme. The name space is a logical set of tables inside the warehouse (remember how the warehouse is already created when the Yaml file is selected).

Initial data is downloaded using the Pyarrow menu inside the memory, and Sensor_table is read using the Pyiceberg Scan () to convert data to Dataframe Pandas.

import os

from pyiceberg.catalog import load_catalog

from pyiceberg.schema import Schema

from pyiceberg.types import (NestedField,

                             StringType, FloatType)

from pyiceberg.partitioning import PartitionSpec, PartitionField

from pyiceberg.transforms import IdentityTransform

import pyarrow as pa


os.environ["PYICEBERG_HOME"] = os.getcwd()

catalog = load_catalog(name='pyarrowtest')

print(catalog.properties)

# Define the schema

schema = Schema(

    NestedField(1, "device_id", StringType(), required=True),

    NestedField(2, "ampere_hour", FloatType(), required=True),

    NestedField(3, "device_make", StringType(), required=True),

    identifier_field_ids=[1]  # 'device_id' is the primary key

)

# Define the partition spec - device_id as partition key

partition_spec = PartitionSpec(

    PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="device_id")

)

# Create a namespace and an iceberg table

catalog.create_namespace_if_not_exists('sensor_ns')

sensor_table = catalog.create_table_if_not_exists(

    identifier='sensor_ns.sensor_data',

    schema=schema,

    partition_spec=partition_spec

)

# Insert initial data

initial_data = pa.table([

    pa.array(['b8:27:eb:bf:9d:51', '00:0f:00:70:91:0a', '1c:bf:ce:15:ec:4d']),

    pa.array([21.43, 17.86, 31.65]),

    pa.array(['ABB', 'Honeywell', 'Siemens'])

], schema=schema.as_arrow())

# Write initial data

sensor_table.overwrite(initial_data)

# Print a Pandas dataframe representation of the table data

print("\nInsert operation completed")

print(sensor_table.scan().to_pandas())

If the above text program is operated successfully, this result will be shown at the station:

With the completion of the inclusion successfully, the three layers of the iceberg can be validated:

  1. CatalogIt is the SQLite Pyarrow_Catalog.db file, which will be verified shortly in this article.
  2. Identification dataUnder the “Descriptive Data” guide, descriptive data files are created, which are decisive to enable, read, update, delete (CRUD). Metadata Json files are created, one during the schedule, and the other after the first inserts of the data. Snap-*.
  3. DataData files are written in .Parquet format, with Device_id as the section key. Due to the presence of three distinct devices, three evidence is created. The ice table ‘Sensor_data’ is created with the name ‘Sensor_NS.DB’ in the ‘D11’ warehouse. These data fields are created in the “Sensor_Data” schedule.

Pyiceberg expressions can be used to filter records. Some common expressions used to filter are: Startswith, Equalto, Greathan, OR, etc.

from pyiceberg.expressions import StartsWith, EqualTo

# Filter columns

print("\nFilter records with device_make == ABB ")

print(sensor_table.scan(row_filter=EqualTo('device_make', 'ABB')).to_pandas())

Pyiceberg supports the ongoing processes as well. The next code updates the Device_make of a “Siemens” sensor to “Kepware”.

# Create an UPSERT batch of Arrow records where one fot he device_make is changed

upsert_data = pa.table([

    pa.array(['b8:27:eb:bf:9d:51', '00:0f:00:70:91:0a', '1c:bf:ce:15:ec:4d']),

    pa.array([21.43, 17.86, 31.65]),

    pa.array(['ABB', 'Honeywell', 'Kepware'])

], schema=schema.as_arrow())

# UPSERT changed data

try:

    join_columns = ["device_id"]

    upsert_result = sensor_table.upsert(upsert_data.select(["device_id", "ampere_hour", "device_make"]))

except Exception as e:

    print(e)

print("\nUpsert operation completed")

print(sensor_table.scan().to_pandas())

Likewise, the deletion process is also supported:

# Delete row

sensor_table.delete(delete_filter=EqualTo('device_id', '1c:bf:ce:15:ec:4d'))

print("\n After Delete")

print(sensor_table.scan().to_pandas())

It should be noted that deletion in any warehouse is an accurate process, and Icberg is not an exception. The processes are defined at the level of the iCeberg through two strategies: copies of the Write (the cow) and the merge of reading (MOR). Deleting operations also create deletion files for the MOR strategy.

Pyicberg supports the deletion of the MOR, but with some nuances. While Pyiceberg provides the ability to delete rows, this is implemented primarily using the default cow’s deletion, which means rewriting data files instead of creating deleted files. However, there is a ongoing work to reinforce the Mor’s MOR to support the deletion of equality and make it more efficient in frequent small updates.

As a final step, let’s use Duckdb to inquire about SQLite Iceberg, stored in the Pyarrow_Catalog.db file. The next command is running on Vs Code:

duckdb -ui pyiceberg_catalog/pyarrow_catalog.db

This will open a browser window in the port 4213 (by default), where the SQL query can be run on the iceberg mountain, as shown:

This provides an easy and simple way to extract ideas from the SQL catalog

Open data visions with pyiceberg

For companies that have smaller data sizes than Terabytes, Pyiceberg and Pyarrow are quick options to run interactive infections in Python. The use of this mixture can significantly reduce time to generate visions of the small to the medium -sized.

Data engineers can start Pyiceberg documents, which are kept and keeping up with. Also, the API page has great examples of all Pyiceberg Applications.

Happy coding!

Related Articles

Leave a Reply

Your email address will not be published. Required fields are marked *

Back to top button