首頁>Program>source

我有一些要通過Spark打開的zip檔案.由於Hadoop的本機編解碼器支援,我可以打開.gzip檔案没有問题,但是無法使用.zip檔案打開它。

是否有一種簡便的方法来讀取Spark代碼中的zip檔案? 我還搜尋了要添加到CompressionCodecFactory中的zip編解碼器實現,但到目前為止仍未成功。

最新回復
  • 5月前
    1 #

    請尝試以下代碼:

    using API sparkContext.newAPIHadoopRDD(
        hadoopConf,
        InputFormat.class,
        ImmutableBytesWritable.class, Result.class)
    

  • 5月前
    2 #

    没有使用python代碼的解決方案,最近我不得不阅讀pyspark中的zip檔案.而且,在搜尋操作方法時,我遇到了這个問题.因此,希望這会對其他人有所帮助。

    import zipfile
    import io
    def zip_extract(x):
        in_memory_data = io.BytesIO(x[1])
        file_obj = zipfile.ZipFile(in_memory_data, "r")
        files = [i for i in file_obj.namelist()]
        return dict(zip(files, [file_obj.open(file).read() for file in files]))
    
    zips = sc.binaryFiles("hdfs:/Testing/*.zip")
    files_data = zips.map(zip_extract).collect()
    

    在上面的代碼中,我返迴了一个字典,其中以zip中的檔案名作為键,並以每个檔案中的文字資料作為值.您可以根据需要进行更改。

  • 5月前
    3 #

    我遇到了類似的問题,並且使用以下代碼已解決

    sparkContext.binaryFiles("/pathToZipFiles/*")
    .flatMap { case (zipFilePath, zipContent) =>
            val zipInputStream = new ZipInputStream(zipContent.open())
            Stream.continually(zipInputStream.getNextEntry)
            .takeWhile(_ != null)
            .flatMap { zipEntry => ??? }
        }
    

  • 5月前
    4 #

    @ user3591785指出了我正確的方向,所以我將他的迴答標記為正確。

    有關更多详细資訊,我能够搜尋zipFileInputFormat Hadoop,並遇到以下鏈接:http://cotdp.com/2012/07/hadoop-processing-zip-files-in-mapreduce/

    通過使用zipFileInputFormat及其帮助程式zipfileRecordReader類,我能够使Spark完美打開並讀取zip檔案.

       rdd1  = sc.newAPIHadoopFile("/Users/myname/data/compressed/target_file.ZIP", ZipFileInputFormat.class, Text.class, Text.class, new Job().getConfiguration());
    

    結果是一个包含一个元素的地圖.檔案名作為键,內容作為值,因此我需要將其轉換為JavaPairRdd.我確定如果愿意,您可以用Byteswritable替換Text,並用其他东西替換ArrayList,但是我的目標是首先執行一些东西。

    JavaPairRDD<String, String> rdd2 = rdd1.flatMapToPair(new PairFlatMapFunction<Tuple2<Text, Text>, String, String>() {
        @Override
        public Iterable<Tuple2<String, String>> call(Tuple2<Text, Text> textTextTuple2) throws Exception {
            List<Tuple2<String,String>> newList = new ArrayList<Tuple2<String, String>>();
            InputStream is = new ByteArrayInputStream(textTextTuple2._2.getBytes());
            BufferedReader br = new BufferedReader(new InputStreamReader(is, "UTF-8"));
            String line;
            while ((line = br.readLine()) != null) {
            Tuple2 newTuple = new Tuple2(line.split("\\t")[0],line);
                newList.add(newTuple);
            }
            return newList;
        }
    });
    

  • 5月前
    5 #

    using API sparkContext.newAPIHadoopRDD(hadoopConf, InputFormat.class, ImmutableBytesWritable.class, Result.class)
    

    檔案名應使用conf傳遞

    conf=( new Job().getConfiguration())
    conf.set(PROPERTY_NAME from your input formatter,"Zip file address")
    sparkContext.newAPIHadoopRDD(conf, ZipFileInputFormat.class, Text.class, Text.class)
    

    請找到 PROPERTY_NAME 从輸入格式化程式設置路徑

  • 如何从MySQL中的日期時間中减去小時?
  • performance:傳遞大量資料的查詢的最佳MySQL設置?