我们都知道hadoop主要使用java实现的,那么如何使用python与hadoop生态圈进行交互呢,我看到一篇很好的文章,结合google翻译和自己的认识分享给大家。
            
             您将学习如何从Hadoop Distributed Filesystem直接加载文件内存等信息。将文件从本地移动到HDFS或设置Spark。
          
            
              
                from
              
               pathlib 
              
                import
              
               Path
              
                import
              
               pandas 
              
                as
              
               pd
              
                import
              
               numpy 
              
                as
              
               np
            
          
          
spark 安装
首先,安装findspark,以及pyspark,以防您在本地计算机上工作。如果您在Hadoop集群中关注本教程,可以跳过pyspark install。为简单起见,我将使用conda虚拟环境管理器(专业提示:在开始之前创建虚拟环境,不要破坏系统Python安装!)。
            
              !conda install 
              
                -
              
              c conda
              
                -
              
              forge findspark 
              
                -
              
              y
!conda install 
              
                -
              
              c conda
              
                -
              
              forge pyspark 
              
                -
              
              y
            
          
          使用findspark进行Spark设置
            
              
                import
              
               findspark
              
                # Local Spark
              
              
                # findspark.init('/home/cloudera/miniconda3/envs/jupyter/lib/python3.7/site-packages/pyspark/')
              
              
                # Cloudera cluster Spark
              
              
findspark
              
                .
              
              init
              
                (
              
              spark_home
              
                =
              
              
                '/opt/cloudera/parcels/SPARK2-2.3.0.cloudera4-1.cdh5.13.3.p0.611179/lib/spark2/'
              
              
                )
              
            
          
          进入pyspark shell
            
              
                from
              
               pyspark
              
                .
              
              sql 
              
                import
              
               SparkSession
spark 
              
                =
              
               SparkSession
              
                .
              
              builder
              
                .
              
              appName
              
                (
              
              
                'example_app'
              
              
                )
              
              
                .
              
              master
              
                (
              
              
                'local[*]'
              
              
                )
              
              
                .
              
              getOrCreate
              
                (
              
              
                )
              
            
          
          让我们获得现有的数据库。我假设您熟悉Spark DataFrame API及其方法:
            
              spark
              
                .
              
              sql
              
                (
              
              
                "show databases"
              
              
                )
              
              
                .
              
              show
              
                (
              
              
                )
              
            
          
          
            ±-----------+
            
             |databaseName|
            
             ±-----------+
            
             | __ibis_tmp|
            
             | analytics|
            
             | db1|
            
             | default|
            
             | fhadoop|
            
             | juan|
            
             ±-----------+
          
pandas -> spark
第一个集成是关于如何将数据从pandas库(即用于执行内存数据操作的Python标准库)移动到Spark。首先,让我们加载一个pandas DataFrame。这个是关于马德里的空气质量(只是为了满足您的好奇心,但对于将数据从一个地方移动到另一个地方并不重要)。你可以在这里下载。确保安装pytables以读取hdf5数据。
            
              air_quality_df 
              
                =
              
               pd
              
                .
              
              read_hdf
              
                (
              
              
                'data/air_quality/air-quality-madrid/madrid.h5'
              
              
                ,
              
               key
              
                =
              
              
                '28079008'
              
              
                )
              
              
air_quality_df
              
                .
              
              head
              
                (
              
              
                )
              
            
          
          
          | BEN | CH4 | CO | EBE | NMHC | NO | NO_2 | NOx | O_3 | PM10 | PM25 | SO_2 | TCH | TOL | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| date | ||||||||||||||
| 2001-07-01 01:00:00 | 30.65 | NaN | 6.91 | 42.639999 | NaN | NaN | 381.299988 | 1017.000000 | 9.010000 | 158.899994 | NaN | 47.509998 | NaN | 76.050003 | 
| 2001-07-01 02:00:00 | 29.59 | NaN | 2.59 | 50.360001 | NaN | NaN | 209.500000 | 409.200012 | 23.820000 | 104.800003 | NaN | 20.950001 | NaN | 84.900002 | 
| 2001-07-01 03:00:00 | 4.69 | NaN | 0.76 | 25.570000 | NaN | NaN | 116.400002 | 143.399994 | 31.059999 | 48.470001 | NaN | 11.270000 | NaN | 20.980000 | 
| 2001-07-01 04:00:00 | 4.46 | NaN | 0.74 | 22.629999 | NaN | NaN | 116.199997 | 149.300003 | 23.780001 | 47.500000 | NaN | 10.100000 | NaN | 14.770000 | 
| 2001-07-01 05:00:00 | 2.18 | NaN | 0.57 | 11.920000 | NaN | NaN | 100.900002 | 124.800003 | 29.530001 | 49.689999 | NaN | 7.680000 | NaN | 8.970000 | 
            
              air_quality_df
              
                .
              
              reset_index
              
                (
              
              inplace
              
                =
              
              
                True
              
              
                )
              
              
air_quality_df
              
                [
              
              
                'date'
              
              
                ]
              
              
                =
              
               air_quality_df
              
                [
              
              
                'date'
              
              
                ]
              
              
                .
              
              dt
              
                .
              
              strftime
              
                (
              
              
                '%Y-%m-%d %H:%M:%S'
              
              
                )
              
            
          
          我们可以简单地从pandas加载到Spark createDataFrame:
            
              air_quality_sdf 
              
                =
              
               spark
              
                .
              
              createDataFrame
              
                (
              
              air_quality_df
              
                )
              
              
air_quality_sdf
              
                .
              
              dtypes
            
          
          将DataFrame加载到Spark(如此air_quality_sdf处)后,可以使用PySpark方法轻松操作:
            
              air_quality_sdf
              
                .
              
              select
              
                (
              
              
                'date'
              
              
                ,
              
              
                'NOx'
              
              
                )
              
              
                .
              
              show
              
                (
              
              
                5
              
              
                )
              
            
          
          
            ±------------------±-----------------+
            
             | date| NOx|
            
             ±------------------±-----------------+
            
             |2001-07-01 01:00:00| 1017.0|
            
             |2001-07-01 02:00:00|409.20001220703125|
            
             |2001-07-01 03:00:00|143.39999389648438|
            
             |2001-07-01 04:00:00| 149.3000030517578|
            
             |2001-07-01 05:00:00|124.80000305175781|
            
             ±------------------±-----------------+
            
             only showing top 5 rows
          
pandas -> spark -> hive
要将Spark DataFrame持久保存到HDFS中,可以使用默认的Hadoop SQL引擎(Hive)进行查询,一个简单的策略(不是唯一的策略)是从该DataFrame创建时间视图:
            
              air_quality_sdf
              
                .
              
              createOrReplaceTempView
              
                (
              
              
                "air_quality_sdf"
              
              
                )
              
            
          
          创建时态视图后,可以使用Spark SQL引擎创建实时表create table as select。在创建此表之前,我将创建一个名为analytics存储它的新数据库
            
              sql_drop_table 
              
                =
              
              
                """
drop table if exists analytics.pandas_spark_hive
"""
              
              
sql_drop_database 
              
                =
              
              
                """
drop database if exists analytics cascade
"""
              
              
sql_create_database 
              
                =
              
              
                """
create database if not exists analytics
location '/user/cloudera/analytics/'
"""
              
              
sql_create_table 
              
                =
              
              
                """
create table if not exists analytics.pandas_spark_hive
using parquet
as select to_timestamp(date) as date_parsed, *
from air_quality_sdf
"""
              
              
                print
              
              
                (
              
              
                "dropping database..."
              
              
                )
              
              
result_drop_db 
              
                =
              
               spark
              
                .
              
              sql
              
                (
              
              sql_drop_database
              
                )
              
              
                print
              
              
                (
              
              
                "creating database..."
              
              
                )
              
              
result_create_db 
              
                =
              
               spark
              
                .
              
              sql
              
                (
              
              sql_create_database
              
                )
              
              
                print
              
              
                (
              
              
                "dropping table..."
              
              
                )
              
              
result_droptable 
              
                =
              
               spark
              
                .
              
              sql
              
                (
              
              sql_drop_table
              
                )
              
              
                print
              
              
                (
              
              
                "creating table..."
              
              
                )
              
              
result_create_table 
              
                =
              
               spark
              
                .
              
              sql
              
                (
              
              sql_create_table
              
                )
              
              
borrando bb
              
                .
              
              dd
              
                .
              
              
                .
              
              
                .
              
              
creando bb
              
                .
              
              dd
              
                .
              
              
                .
              
              
                .
              
              
borrando tabla
              
                .
              
              
                .
              
              
                .
              
              
creando tabla
              
                .
              
              
                .
              
              
                .
              
            
          
          可以使用Spark SQL引擎检查结果,例如选择臭氧污染物浓度随时间变化:
            
              spark.sql("select * from analytics.pandas_spark_hive").select("date_parsed", "O_3").show(5)
            
          
          
            ±------------------±-----------------+
            
             | date_parsed| O_3|
            
             ±------------------±-----------------+
            
             |2001-07-01 01:00:00| 9.010000228881836|
            
             |2001-07-01 02:00:00| 23.81999969482422|
            
             |2001-07-01 03:00:00|31.059999465942383|
            
             |2001-07-01 04:00:00|23.780000686645508|
            
             |2001-07-01 05:00:00|29.530000686645508|
            
             ±------------------±-----------------+
            
             only showing top 5 rows
            
              
            
              
            
              
          
Apache Arrow
Apache Arrow是一种内存中的柱状数据格式,用于支持大数据环境中的高性能操作(可以将其视为内存等效的parquet格式)。它是用C ++开发的,但它的Python API很棒,你现在可以看到,但首先请安装它:
            
              !conda install pyarrow -y
            
          
          为了与HDFS建立本地通信,我将使用pyarrow中包含的接口。只有要求是设置一个指向其位置的环境变量libhdfs。请记住,我们处于Cloudera环境中。如果你正在使用Horton必须找到合适的位置(相信我,它存在)。
建立连接
            
              
                import
              
               pyarrow 
              
                as
              
               pa
              
                import
              
               os
os
              
                .
              
              environ
              
                [
              
              
                'ARROW_LIBHDFS_DIR'
              
              
                ]
              
              
                =
              
              
                '/opt/cloudera/parcels/CDH-5.14.4-1.cdh5.14.4.p0.3/lib64/'
              
              
hdfs_interface 
              
                =
              
               pa
              
                .
              
              hdfs
              
                .
              
              connect
              
                (
              
              host
              
                =
              
              
                'localhost'
              
              
                ,
              
               port
              
                =
              
              
                8020
              
              
                ,
              
               user
              
                =
              
              
                'cloudera'
              
              
                )
              
            
          
          在HDFS中列出文件
让我们列出Spark之前保存的文件。请记住,这些文件先前已从本地文件加载到pandas DataFrame中,然后加载到Spark DataFrame中。Spark默认使用分区为大量snappy压缩文件的文件。在HDFS路径中,您可以标识数据库名称(analytics)和表名称(pandas_spark_hive):
            
              hdfs_interface
              
                .
              
              ls
              
                (
              
              
                '/user/cloudera/analytics/pandas_spark_hive/'
              
              
                )
              
              
                [
              
              
                '/user/cloudera/analytics/pandas_spark_hive/_SUCCESS'
              
              
                ,
              
              
                '/user/cloudera/analytics/pandas_spark_hive/part-00000-b4371c8e-0f5c-4d20-a136-a65e56e97f16-c000.snappy.parquet'
              
              
                ,
              
              
                '/user/cloudera/analytics/pandas_spark_hive/part-00001-b4371c8e-0f5c-4d20-a136-a65e56e97f16-c000.snappy.parquet'
              
              
                ,
              
              
                '/user/cloudera/analytics/pandas_spark_hive/part-00002-b4371c8e-0f5c-4d20-a136-a65e56e97f16-c000.snappy.parquet'
              
              
                ,
              
              
                '/user/cloudera/analytics/pandas_spark_hive/part-00003-b4371c8e-0f5c-4d20-a136-a65e56e97f16-c000.snappy.parquet'
              
              
                ,
              
              
                '/user/cloudera/analytics/pandas_spark_hive/part-00004-b4371c8e-0f5c-4d20-a136-a65e56e97f16-c000.snappy.parquet'
              
              
                ,
              
              
                '/user/cloudera/analytics/pandas_spark_hive/part-00005-b4371c8e-0f5c-4d20-a136-a65e56e97f16-c000.snappy.parquet'
              
              
                ,
              
              
                '/user/cloudera/analytics/pandas_spark_hive/part-00006-b4371c8e-0f5c-4d20-a136-a65e56e97f16-c000.snappy.parquet'
              
              
                ,
              
              
 '
              
                /
              
              user
              
                /
              
              cloudera
              
                /
              
              analytics
              
                /
              
              pandas_spark_hive
              
                /
              
              part
              
                -
              
              
                00007
              
              
                -
              
              b4371c8e
              
                -
              
              
                0f5
              
            
          
          Reading parquet files directly from HDFS
要直接从HDFS读取representing文件(或充满表示文件的文件的文件夹),我将使用之前创建的PyArrow HDFS界面:
            
              table 
              
                =
              
               hdfs_interface
              
                .
              
              read_parquet
              
                (
              
              
                '/user/cloudera/analytics/pandas_spark_hive/'
              
              
                )
              
            
          
          HDFS -> pandas
一旦parquetPyArrow HDFS接口读取文件,就会创建一个Table对象。我们可以通过方法轻松回到pandas 使用 to_pandas:
            
              table_df 
              
                =
              
               table
              
                .
              
              to_pandas
              
                (
              
              
                )
              
              
table_df
              
                .
              
              head
              
                (
              
              
                )
              
              
                /
              
              home
              
                /
              
              cloudera
              
                /
              
              miniconda3
              
                /
              
              envs
              
                /
              
              jupyter
              
                /
              
              lib
              
                /
              
              python3
              
                .
              
              
                6
              
              
                /
              
              site
              
                -
              
              packages
              
                /
              
              pyarrow
              
                /
              
              pandas_compat
              
                .
              
              py
              
                :
              
              
                752
              
              
                :
              
               FutureWarning
              
                :
              
              
                .
              
              labels was deprecated 
              
                in
              
               version 
              
                0.24
              
              
                .0
              
              
                .
              
               Use 
              
                .
              
              codes instead
              
                .
              
              
  labels
              
                ,
              
              
                =
              
               index
              
                .
              
              labels
            
          
          
          | date_parsed | date | BEN | CH4 | CO | EBE | NMHC | NO | NO_2 | NOx | O_3 | PM10 | PM25 | SO_2 | TCH | TOL | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | 2001-06-30 23:00:00 | 2001-07-01 01:00:00 | 30.65 | NaN | 6.91 | 42.639999 | NaN | NaN | 381.299988 | 1017.000000 | 9.010000 | 158.899994 | NaN | 47.509998 | NaN | 76.050003 | 
| 1 | 2001-07-01 00:00:00 | 2001-07-01 02:00:00 | 29.59 | NaN | 2.59 | 50.360001 | NaN | NaN | 209.500000 | 409.200012 | 23.820000 | 104.800003 | NaN | 20.950001 | NaN | 84.900002 | 
| 2 | 2001-07-01 01:00:00 | 2001-07-01 03:00:00 | 4.69 | NaN | 0.76 | 25.570000 | NaN | NaN | 116.400002 | 143.399994 | 31.059999 | 48.470001 | NaN | 11.270000 | NaN | 20.980000 | 
| 3 | 2001-07-01 02:00:00 | 2001-07-01 04:00:00 | 4.46 | NaN | 0.74 | 22.629999 | NaN | NaN | 116.199997 | 149.300003 | 23.780001 | 47.500000 | NaN | 10.100000 | NaN | 14.770000 | 
| 4 | 2001-07-01 03:00:00 | 2001-07-01 05:00:00 | 2.18 | NaN | 0.57 | 11.920000 | NaN | NaN | 100.900002 | 124.800003 | 29.530001 | 49.689999 | NaN | 7.680000 | NaN | 8.970000 | 
上传本地文件到HDFS
使用PyArrow HDFS接口支持所有类型的HDFS操作,例如,将一堆本地文件上传到HDFS:
            
              cwd 
              
                =
              
               Path
              
                (
              
              
                './data/'
              
              
                )
              
              
destination_path 
              
                =
              
              
                '/user/cloudera/analytics/data/'
              
              
                for
              
               f 
              
                in
              
               cwd
              
                .
              
              rglob
              
                (
              
              
                '*.*'
              
              
                )
              
              
                :
              
              
                print
              
              
                (
              
              f
              
                'uploading {f.name}'
              
              
                )
              
              
                with
              
              
                open
              
              
                (
              
              
                str
              
              
                (
              
              f
              
                )
              
              
                ,
              
              
                'rb'
              
              
                )
              
              
                as
              
               f_upl
              
                :
              
              
        hdfs_interface
              
                .
              
              upload
              
                (
              
              destination_path 
              
                +
              
               f
              
                .
              
              name
              
                ,
              
               f_upl
              
                )
              
              
uploading sandp500
              
                .
              
              
                zip
              
              
uploading stations
              
                .
              
              csv
uploading madrid
              
                .
              
              h5
uploading diamonds_train
              
                .
              
              csv
uploading diamonds_test
              
                .
              
              csv
            
          
          让我们检查文件是否已正确上传,列出目标路径中的文件:
            
              hdfs_interface
              
                .
              
              ls
              
                (
              
              destination_path
              
                )
              
              
                [
              
              
                '/user/cloudera/analytics/data/diamonds_test.csv'
              
              
                ,
              
              
                '/user/cloudera/analytics/data/diamonds_train.csv'
              
              
                ,
              
              
                '/user/cloudera/analytics/data/madrid.h5'
              
              
                ,
              
              
                '/user/cloudera/analytics/data/sandp500.zip'
              
              
                ,
              
              
                '/user/cloudera/analytics/data/stations.csv'
              
              
                ]
              
            
          
          Reading arbitrary files (not parquet) from HDFS (HDFS -> pandas example
例如,.csv可以使用方法和标准pandas函数将文件从HDFS直接加载到pandas DataFrame中open,read_csv该函数可以获取缓冲区作为输入:
            
              diamonds_train 
              
                =
              
               pd
              
                .
              
              read_csv
              
                (
              
              hdfs_interface
              
                .
              
              
                open
              
              
                (
              
              
                '/user/cloudera/analytics/data/diamonds_train.csv'
              
              
                )
              
              
                )
              
              
diamonds_train
              
                .
              
              head
              
                (
              
              
                )
              
            
          
          | carat | cut | color | clarity | depth | table | price | x | y | z | |
|---|---|---|---|---|---|---|---|---|---|---|
| 0 | 1.21 | Premium | J | VS2 | 62.4 | 58.0 | 4268 | 6.83 | 6.79 | 4.25 | 
| 1 | 0.32 | Very Good | H | VS2 | 63.0 | 57.0 | 505 | 4.35 | 4.38 | 2.75 | 
| 2 | 0.71 | Fair | G | VS1 | 65.5 | 55.0 | 2686 | 5.62 | 5.53 | 3.65 | 
| 3 | 0.41 | Good | D | SI1 | 63.8 | 56.0 | 738 | 4.68 | 4.72 | 3.00 | 
| 4 | 1.02 | Ideal | G | SI1 | 60.5 | 59.0 | 4882 | 6.55 | 6.51 | 3.95 | 
            如果您对该库具有的所有方法和可能性感兴趣,请访问:https://arrow.apache.org/docs/python/filesystems.html#hdfs-api
            
              
            
              
          
WebHDFS
有时无法访问libhdfs本机HDFS库(例如,从不属于群集的计算机执行分析)。在这种情况下,我们可以依赖WebHDFS(HDFS服务REST API),它速度较慢,不适合繁重的大数据负载,但在轻量级工作负载的情况下是一个有趣的选择。让我们安装一个WebHDFS Python API:
            
              !conda install 
              
                -
              
              c conda
              
                -
              
              forge python
              
                -
              
              hdfs 
              
                -
              
              y
Collecting package metadata
              
                :
              
               done
Solving environment
              
                :
              
               done
              
                ## Package Plan ##
              
              
  environment location
              
                :
              
              
                /
              
              home
              
                /
              
              cloudera
              
                /
              
              miniconda3
              
                /
              
              envs
              
                /
              
              jupyter
  added 
              
                /
              
               updated specs
              
                :
              
              
                -
              
               python
              
                -
              
              hdfs
The following packages will be downloaded
              
                :
              
              
    package                    
              
                |
              
                          build
    
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                |
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
    certifi
              
                -
              
              
                2019.3
              
              
                .9
              
              
                |
              
                         py36_0         
              
                149
              
               KB  conda
              
                -
              
              forge
    
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                                           Total
              
                :
              
              
                149
              
               KB
The following packages will be UPDATED
              
                :
              
              
  ca
              
                -
              
              certificates    pkgs
              
                /
              
              main
              
                :
              
              
                :
              
              ca
              
                -
              
              certificates
              
                -
              
              
                2019.1
              
              
                .23
              
              
                -
              
              
                0
              
              
                -
              
              
                -
              
              
                >
              
               conda
              
                -
              
              forge
              
                :
              
              
                :
              
              ca
              
                -
              
              certificates
              
                -
              
              
                2019.3
              
              
                .9
              
              
                -
              
              hecc5488_0
The following packages will be SUPERSEDED by a higher
              
                -
              
              priority channel
              
                :
              
              
  certifi                                         pkgs
              
                /
              
              main 
              
                -
              
              
                -
              
              
                >
              
               conda
              
                -
              
              forge
  openssl              pkgs
              
                /
              
              main
              
                :
              
              
                :
              
              openssl
              
                -
              
              
                1.1
              
              
                .
              
              
                1b
              
              
                -
              
              h7b6447c_1 
              
                -
              
              
                -
              
              
                >
              
               conda
              
                -
              
              forge
              
                :
              
              
                :
              
              openssl
              
                -
              
              
                1.1
              
              
                .
              
              
                1b
              
              
                -
              
              h14c3975_1
Downloading 
              
                and
              
               Extracting Packages
certifi
              
                -
              
              
                2019.3
              
              
                .9
              
              
                |
              
              
                149
              
               KB    
              
                |
              
              
                ##################################### | 100% 
              
              
Preparing transaction
              
                :
              
               done
Verifying transaction
              
                :
              
               done
Executing transaction
              
                :
              
               done
            
          
          建立WebHDFS连接
建立连接
            
              
                from
              
               hdfs 
              
                import
              
               InsecureClient
web_hdfs_interface 
              
                =
              
               InsecureClient
              
                (
              
              
                'http://localhost:50070'
              
              
                ,
              
               user
              
                =
              
              
                'cloudera'
              
              
                )
              
            
          
          List files in HDFS
列表文件类似于使用PyArrow接口,只需使用list方法和HDFS 路径:
            
              web_hdfs_interface
              
                .
              
              
                list
              
              
                (
              
              
                '/user/cloudera/analytics/data'
              
              
                )
              
              
                [
              
              
                'diamonds_test.csv'
              
              
                ,
              
              
                'diamonds_train.csv'
              
              
                ,
              
              
                'madrid.h5'
              
              
                ,
              
              
                'sandp500.zip'
              
              
                ,
              
              
                'stations.csv'
              
              
                ]
              
            
          
          上传本地文件到HDFS采用WebHDFS
            
              cwd 
              
                =
              
               Path
              
                (
              
              
                './data/'
              
              
                )
              
              
destination_path 
              
                =
              
              
                '/user/cloudera/analytics/data_web_hdfs/'
              
              
                for
              
               f 
              
                in
              
               cwd
              
                .
              
              rglob
              
                (
              
              
                '*.*'
              
              
                )
              
              
                :
              
              
                print
              
              
                (
              
              f
              
                'uploading {f.name}'
              
              
                )
              
              
    web_hdfs_interface
              
                .
              
              upload
              
                (
              
              destination_path 
              
                +
              
               f
              
                .
              
              name
              
                ,
              
              
                str
              
              
                (
              
              f
              
                )
              
              
                ,
              
              
                              overwrite
              
                =
              
              
                True
              
              
                )
              
              
uploading sandp500
              
                .
              
              
                zip
              
              
uploading stations
              
                .
              
              csv
uploading madrid
              
                .
              
              h5
uploading diamonds_train
              
                .
              
              csv
uploading diamonds_test
              
                .
              
              csv
            
          
          让我们检查上传是否正确:
            
              web_hdfs_interface
              
                .
              
              
                list
              
              
                (
              
              destination_path
              
                )
              
              
                [
              
              
                'diamonds_test.csv'
              
              
                ,
              
              
                'diamonds_train.csv'
              
              
                ,
              
              
                'madrid.h5'
              
              
                ,
              
              
                'sandp500.zip'
              
              
                ,
              
              
                'stations.csv'
              
              
                ]
              
            
          
          HDFS也可以处理更大的文件(有一些限制)。这些文件来自Kaggle Microsoft恶意软件竞赛, 每个重量为几GB:
            
              web_hdfs_interface
              
                .
              
              upload
              
                (
              
              destination_path 
              
                +
              
              
                'train.parquet'
              
              
                ,
              
              
                '/home/cloudera/analytics/29_03_2019/notebooks/data/microsoft/train.pq'
              
              
                ,
              
               overwrite
              
                =
              
              
                True
              
              
                )
              
              
                ;
              
              
web_hdfs_interface
              
                .
              
              upload
              
                (
              
              destination_path 
              
                +
              
              
                'test.parquet'
              
              
                ,
              
              
                '/home/cloudera/analytics/29_03_2019/notebooks/data/microsoft/test.pq'
              
              
                ,
              
               overwrite
              
                =
              
              
                True
              
              
                )
              
              
                ;
              
            
          
          使用WebHDFS 从HDFS读取文件(HDFS - > pandas示例)¶
在这种情况下,使用PyArrow parquet模块并传递缓冲区来创建Table对象很有用。之后,可以使用to_pandas方法从Table对象轻松创建pandas DataFrame :
            
              
                from
              
               pyarrow 
              
                import
              
               parquet 
              
                as
              
               pq
              
                from
              
               io 
              
                import
              
               BytesIO
              
                with
              
               web_hdfs_interface
              
                .
              
              read
              
                (
              
              destination_path 
              
                +
              
              
                'train.parquet'
              
              
                )
              
              
                as
              
               reader
              
                :
              
              
    microsoft_train 
              
                =
              
               pq
              
                .
              
              read_table
              
                (
              
              BytesIO
              
                (
              
              reader
              
                .
              
              read
              
                (
              
              
                )
              
              
                )
              
              
                )
              
              
                .
              
              to_pandas
              
                (
              
              
                )
              
              
microsoft_train
              
                .
              
              head
              
                (
              
              
                )
              
            
          
          
          | MachineIdentifier | ProductName | EngineVersion | AppVersion | AvSigVersion | IsBeta | RtpStateBitfield | IsSxsPassiveMode | DefaultBrowsersIdentifier | AVProductStatesIdentifier | … | Census_FirmwareVersionIdentifier | Census_IsSecureBootEnabled | Census_IsWIMBootEnabled | Census_IsVirtualDevice | Census_IsTouchEnabled | Census_IsPenCapable | Census_IsAlwaysOnAlwaysConnectedCapable | Wdft_IsGamer | Wdft_RegionIdentifier | HasDetections | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | 0000028988387b115f69f31a3bf04f09 | win8defender | 1.1.15100.1 | 4.18.1807.18075 | 1.273.1735.0 | 0 | 7.0 | 0 | NaN | 53447.0 | … | 36144.0 | 0 | NaN | 0.0 | 0 | 0 | 0.0 | 0.0 | 10.0 | 0 | 
| 1 | 000007535c3f730efa9ea0b7ef1bd645 | win8defender | 1.1.14600.4 | 4.13.17134.1 | 1.263.48.0 | 0 | 7.0 | 0 | NaN | 53447.0 | … | 57858.0 | 0 | NaN | 0.0 | 0 | 0 | 0.0 | 0.0 | 8.0 | 0 | 
| 2 | 000007905a28d863f6d0d597892cd692 | win8defender | 1.1.15100.1 | 4.18.1807.18075 | 1.273.1341.0 | 0 | 7.0 | 0 | NaN | 53447.0 | … | 52682.0 | 0 | NaN | 0.0 | 0 | 0 | 0.0 | 0.0 | 3.0 | 0 | 
| 3 | 00000b11598a75ea8ba1beea8459149f | win8defender | 1.1.15100.1 | 4.18.1807.18075 | 1.273.1527.0 | 0 | 7.0 | 0 | NaN | 53447.0 | … | 20050.0 | 0 | NaN | 0.0 | 0 | 0 | 0.0 | 0.0 | 3.0 | 1 | 
| 4 | 000014a5f00daa18e76b81417eeb99fc | win8defender | 1.1.15100.1 | 4.18.1807.18075 | 1.273.1379.0 | 0 | 7.0 | 0 | NaN | 53447.0 | … | 19844.0 | 0 | 0.0 | 0.0 | 0 | 0 | 0.0 | 0.0 | 1.0 | 1 | 
5 rows × 83 columns
             
            
              
          
Hive + Impala
Hive和Impala是Hadoop的两个SQL引擎。一个是基于MapReduce(Hive),而Impala是Cloudera创建和开源的更现代,更快速的内存实现。两个引擎都可以使用其多个API之一从Python中充分利用。在这种情况下,我将向您展示impyla,它支持两个引擎。让我们使用conda安装它,不要忘记安装thrift_sasl0.2.1版本(是的,必须是这个特定的版本,否则它将无法工作):
            
              !conda install impyla thrift_sasl
              
                =
              
              
                0.2
              
              
                .1
              
              
                -
              
              y
              
                ## Package Plan ##
              
              
  environment location
              
                :
              
              
                /
              
              home
              
                /
              
              cloudera
              
                /
              
              miniconda3
              
                /
              
              envs
              
                /
              
              jupyter
  added 
              
                /
              
               updated specs
              
                :
              
              
                -
              
               impyla
    
              
                -
              
               thrift_sasl
              
                =
              
              
                0.2
              
              
                .1
              
              
The following packages will be downloaded
              
                :
              
              
    package                    
              
                |
              
                          build
    
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                |
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
    certifi
              
                -
              
              
                2019.3
              
              
                .9
              
              
                |
              
                         py36_0         
              
                155
              
               KB
    
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                                           Total
              
                :
              
              
                155
              
               KB
The following packages will be SUPERSEDED by a higher
              
                -
              
              priority channel
              
                :
              
              
  ca
              
                -
              
              certificates    conda
              
                -
              
              forge
              
                :
              
              
                :
              
              ca
              
                -
              
              certificates
              
                -
              
              
                2019.3
              
              
                .9
              
              
                ~
              
              
                -
              
              
                -
              
              
                >
              
               pkgs
              
                /
              
              main
              
                :
              
              
                :
              
              ca
              
                -
              
              certificates
              
                -
              
              
                2019.1
              
              
                .23
              
              
                -
              
              
                0
              
              
  certifi                                       conda
              
                -
              
              forge 
              
                -
              
              
                -
              
              
                >
              
               pkgs
              
                /
              
              main
  openssl            conda
              
                -
              
              forge
              
                :
              
              
                :
              
              openssl
              
                -
              
              
                1.1
              
              
                .
              
              
                1b
              
              
                -
              
              h14c3975_1 
              
                -
              
              
                -
              
              
                >
              
               pkgs
              
                /
              
              main
              
                :
              
              
                :
              
              openssl
              
                -
              
              
                1.1
              
              
                .
              
              
                1b
              
              
                -
              
              h7b6447c_1
Downloading 
              
                and
              
               Extracting Packages
certifi
              
                -
              
              
                2019.3
              
              
                .9
              
              
                |
              
              
                155
              
               KB    
              
                |
              
              
                ##################################### | 100% 
              
              
Preparing transaction
              
                :
              
               done
Verifying transaction
              
                :
              
               done
Executing transaction
              
                :
              
               done
            
          
          建立连接
            
              
                from
              
               impala
              
                .
              
              dbapi 
              
                import
              
               connect
              
                from
              
               impala
              
                .
              
              util 
              
                import
              
               as_pandas
Hive 
              
                (
              
              Hive 
              
                -
              
              
                >
              
               pandas example
              
                )
              
              ¶
            
          
          API遵循经典的ODBC标准,您可能对此很熟悉。impyla包括一个名为的实用程序函数as_pandas,可以轻松地将结果(元组列表)解析为pandas DataFrame。谨慎使用它,它存在某些类型的数据问题,并且对大数据工作负载效率不高。以两种方式获取结果:
            
              hive_conn 
              
                =
              
               connect
              
                (
              
              host
              
                =
              
              
                'localhost'
              
              
                ,
              
               port
              
                =
              
              
                10000
              
              
                ,
              
               database
              
                =
              
              
                'analytics'
              
              
                ,
              
               auth_mechanism
              
                =
              
              
                'PLAIN'
              
              
                )
              
              
                with
              
               hive_conn
              
                .
              
              cursor
              
                (
              
              
                )
              
              
                as
              
               c
              
                :
              
              
    c
              
                .
              
              execute
              
                (
              
              
                'SELECT * FROM analytics.pandas_spark_hive LIMIT 100'
              
              
                )
              
              
    results 
              
                =
              
               c
              
                .
              
              fetchall
              
                (
              
              
                )
              
              
                with
              
               hive_conn
              
                .
              
              cursor
              
                (
              
              
                )
              
              
                as
              
               c
              
                :
              
              
    c
              
                .
              
              execute
              
                (
              
              
                'SELECT * FROM analytics.pandas_spark_hive LIMIT 100'
              
              
                )
              
              
    results_df 
              
                =
              
               as_pandas
              
                (
              
              c
              
                )
              
            
          
          Impala (Impala -> pandas example)
使用Impala遵循与Hive相同的模式,只需确保连接到正确的端口,在这种情况下默认为21050:
            
              impala_conn 
              
                =
              
               connect
              
                (
              
              host
              
                =
              
              
                'localhost'
              
              
                ,
              
               port
              
                =
              
              
                21050
              
              
                )
              
              
                with
              
               impala_conn
              
                .
              
              cursor
              
                (
              
              
                )
              
              
                as
              
               c
              
                :
              
              
    c
              
                .
              
              execute
              
                (
              
              
                'show databases'
              
              
                )
              
              
    result_df 
              
                =
              
               as_pandas
              
                (
              
              c
              
                )
              
            
          
          
          | name | comment | |
|---|---|---|
| 0 | __ibis_tmp | |
| 1 | _impala_builtins | System database for Impala builtin functions | 
| 2 | analytics | |
| 3 | db1 | |
| 4 | default | Default Hive database | 
| 5 | fhadoop | |
| 6 | juan | 
Ibis Framework
另一种选择是Ibis Framework,它是一个相对庞大的数据源集合的高级API,包括HDFS和Impala。它是围绕使用Python对象和方法对这些源执行操作的想法构建的。让我们以与其他库相同的方式安装它:
            
              !conda install ibis-framework -y
            
          
          让我们创建一个HDFS和Impala接口(impala需要在Ibis中使用hdfs接口对象):
            
              
                import
              
               ibis
hdfs_ibis 
              
                =
              
               ibis
              
                .
              
              hdfs_connect
              
                (
              
              host
              
                =
              
              
                'localhost'
              
              
                ,
              
               port
              
                =
              
              
                50070
              
              
                )
              
              
impala_ibis 
              
                =
              
               ibis
              
                .
              
              impala
              
                .
              
              connect
              
                (
              
              host
              
                =
              
              
                'localhost'
              
              
                ,
              
               port
              
                =
              
              
                21050
              
              
                ,
              
               hdfs_client
              
                =
              
              hdfs_ibis
              
                ,
              
               user
              
                =
              
              
                'cloudera'
              
              
                )
              
            
          
          创建接口后,可以执行调用方法的操作,无需编写更多SQL。如果您熟悉ORM(对象关系映射器),这不完全相同,但基本思想非常相似。
            
              impala_ibis
              
                .
              
              invalidate_metadata
              
                (
              
              
                )
              
              
impala_ibis
              
                .
              
              list_databases
              
                (
              
              
                )
              
            
          
          
            [’__ibis_tmp’,
            
             ‘_impala_builtins’,
            
             ‘analytics’,
            
             ‘db1’,
            
             ‘default’,
            
             ‘fhadoop’,
            
             ‘juan’]
          
Impala -> pandas
ibis本地工作于pandas,因此无需进行转换。读表返回一个pandas DataFrame对象:
            
              table 
              
                =
              
               impala_ibis
              
                .
              
              table
              
                (
              
              
                'pandas_spark_hive'
              
              
                ,
              
               database
              
                =
              
              
                'analytics'
              
              
                )
              
              
table_df 
              
                =
              
               table
              
                .
              
              execute
              
                (
              
              
                )
              
              
table_df
              
                .
              
              head
              
                (
              
              
                )
              
            
          
          pandas–>Impala
从pandas到Impala可以使用Ibis使用Impala接口选择数据库,设置权限(取决于您的群集设置)并使用该方法create,将pandas DataFrame对象作为参数传递:
            
              analytics_db
              
                .
              
              table
              
                (
              
              
                'diamonds'
              
              
                )
              
              
                .
              
              execute
              
                (
              
              
                )
              
              
                .
              
              head
              
                (
              
              
                5
              
              
                )
              
            
          
          
          | carat | cut | color | clarity | depth | table | price | x | y | z | |
|---|---|---|---|---|---|---|---|---|---|---|
| 0 | 1.21 | Premium | J | VS2 | 62.4 | 58.0 | 4268 | 6.83 | 6.79 | 4.25 | 
| 1 | 0.32 | Very Good | H | VS2 | 63.0 | 57.0 | 505 | 4.35 | 4.38 | 2.75 | 
| 2 | 0.71 | Fair | G | VS1 | 65.5 | 55.0 | 2686 | 5.62 | 5.53 | 3.65 | 
| 3 | 0.41 | Good | D | SI1 | 63.8 | 56.0 | 738 | 4.68 | 4.72 | 3.00 | 
| 4 | 1.02 | Ideal | G | SI1 | 60.5 | 59.0 | 4882 | 6.55 | 6.51 | 3.95 | 


 
					 
					