乐趣区

Spark内置图像数据源初探

作者:林武康,花名知瑕, 阿里巴巴计算平台事业部 EMR 团队的高级开发工程师,Apache HUE Contributor, 参与了多个开源项目的研发工作,对于分布式系统设计应用有较丰富的经验,目前主要专注于 EMR 数据开发相关的产品的研发工作。

概述

在 Apache Spark 2.4 中引入了一个新的内置数据源, 图像数据源. 用户可以通过 DataFrame API 加载指定目录的中图像文件, 生成一个 DataFrame 对象. 通过该 DataFrame 对象, 用户可以对图像数据进行简单的处理, 然后使用 MLlib 进行特定的训练和分类计算.
本文将介绍图像数据源的实现细节和使用方法.

简单使用

先通过一个例子来简单的了解下图像数据源使用方法. 本例设定有一组图像文件存放在阿里云的 OSS 上, 需要对这组图像加水印, 并压缩存储到 parquet 文件中. 废话不说, 先上代码:

  def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]")
    val spark = SparkSession.builder()
      .config(conf)
      .getOrCreate()

    val imageDF = spark.read.format("image").load("oss://<bucket>/path/to/src/dir")
    imageDF.select("image.origin", "image.width", "image.height", "image.nChannels", "image.mode", "image.data")
        .map(row => {val origin = row.getAs[String]("origin")
          val width = row.getAs[Int]("width")
          val height = row.getAs[Int]("height")
          val mode = row.getAs[Int]("mode")
          val nChannels = row.getAs[Int]("nChannels")
          val data = row.getAs[Array[Byte]]("data")
          Row(Row(origin, height, width, nChannels, mode,
            markWithText(width, height, BufferedImage.TYPE_3BYTE_BGR, data, "EMR")))
        }).write.format("parquet").save("oss://<bucket>/path/to/dst/dir")
  }

  def markWithText(width: Int, height: Int, imageType: Int, data: Array[Byte], text: String): Array[Byte] = {val image = new BufferedImage(width, height, imageType)
    val raster = image.getData.asInstanceOf[WritableRaster]
    val pixels = data.map(_.toInt)
    raster.setPixels(0, 0, width, height, pixels)
    image.setData(raster)
    val buffImg = new BufferedImage(width, height, imageType)
    val g = buffImg.createGraphics
    g.drawImage(image, 0, 0, null)
    g.setColor(Color.red)
    g.setFont(new Font("宋体", Font.BOLD, 30))
    g.drawString(text, width/2, height/2)
    g.dispose()
    val buffer = new ByteArrayOutputStream
    ImageIO.write(buffImg, "JPG", buffer)
    buffer.toByteArray
  }

从生成的 parquet 文件中抽取一条图像二进制数据, 保存为本地 jpg, 效果如下:

你可能注意到两个图像到颜色并不相同, 这是因为 Spark 的图像数据将图像解码为 BGR 顺序的数据, 而示例程序在保存的时候, 没有处理这个变换, 导致颜色出现了反差.

实现初窥

下面我们深入到 spark 源码中来看一下实现细节.Apache Spark 内置图像数据源的实现代码在 spark-mllib 这个模块中. 主要包括两个类:

  • org.apache.spark.ml.image.ImageSchema
  • org.apache.spark.ml.source.image.ImageFileFormat

其中,ImageSchema 定义了图像文件加载为 DataFrame 的 Row 的格式和解码方法.ImageFileFormat 提供了面向存储层的读写接口.

格式定义

一个图像文件被加载为 DataFrame 后, 对应的如下:

    StructField("origin", StringType, true) ::
    StructField("height", IntegerType, false) ::
    StructField("width", IntegerType, false) ::
    StructField("nChannels", IntegerType, false) ::
    // OpenCV-compatible type: CV_8UC3 in most cases
    StructField("mode", IntegerType, false) ::
    // Bytes in OpenCV-compatible order: row-wise BGR in most cases
    StructField("data", BinaryType, false) :: Nil)

  val imageFields: Array[String] = columnSchema.fieldNames
  val imageSchema = StructType(StructField("image", columnSchema, true) :: Nil)

如果将该 DataFrame 打印出来, 可以得到如下形式的表:

|image.origin        |image.width|image.height|image.nChannels|image.mode|image.data         |
+--------------------+-----------+------------+---------------+----------+-------------------+
|oss://.../dir/1.jpg |600        |343         |3              |16        |55 45 21 56  ...   |
+--------------------+-----------+------------+---------------+----------+-------------------+

其中:

  • origin: 原始图像文件的路径
  • width: 图像的宽度, 单位像素
  • height: 图像的高度, 单位像素
  • nChannels: 图像的通道数, 如常见的 RGB 位图为通道数为 3
  • mode: 像素矩阵 (data) 中元素的数值类型和通道顺序, 与 OpenCV 的类型兼容
  • data: 解码后的像素矩阵

提示: 关于图像的基础支持, 可以参考如下文档: Image file reading and writing

加载和解码

图像文件通过 ImageFileFormat 加载为一个 Row 对象.

// 为了简化说明起见, 代码有删减和改动
private[image] class ImageFileFormat extends FileFormat with DataSourceRegister {
  ......

  override def prepareWrite(
      sparkSession: SparkSession,
      job: Job,
      options: Map[String, String],
      dataSchema: StructType): OutputWriterFactory = {throw new UnsupportedOperationException("Write is not supported for image data source")
  }

  override protected def buildReader(
      sparkSession: SparkSession,
      dataSchema: StructType,
      partitionSchema: StructType,
      requiredSchema: StructType,
      filters: Seq[Filter],
      options: Map[String, String],
      hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {    
    ......
    (file: PartitionedFile) => {
    ......
        val path = new Path(origin)
        val stream = fs.open(path)
        val bytes = ByteStreams.toByteArray(stream)
        val resultOpt = ImageSchema.decode(origin, bytes) // <-- 解码 
        val filteredResult = Iterator(resultOpt.getOrElse(ImageSchema.invalidImageRow(origin)))
     ......
          val converter = RowEncoder(requiredSchema)
          filteredResult.map(row => converter.toRow(row))
     ......
      }
    }
  }
}

从上可以看出:

  • 当前的图像数据源实现并不支持保存操作;
  • 图像数据的解码工作在 ImageSchema 中完成.

下面来看一下具体的解码过程:

// 为了简化说明起见, 代码有删减和改动
private[spark] def decode(origin: String, bytes: Array[Byte]): Option[Row] = {
        // 使用 ImageIO 加载原始图像数据
    val img = ImageIO.read(new ByteArrayInputStream(bytes))
    if (img != null) {
      // 获取图像的基本属性
      val isGray = img.getColorModel.getColorSpace.getType == ColorSpace.TYPE_GRAY
      val hasAlpha = img.getColorModel.hasAlpha
      val height = img.getHeight
      val width = img.getWidth
      // ImageIO::ImageType -> OpenCV Type
      val (nChannels, mode) = if (isGray) {(1, ocvTypes("CV_8UC1"))
      } else if (hasAlpha) {(4, ocvTypes("CV_8UC4"))
      } else {(3, ocvTypes("CV_8UC3"))
      }
            // 解码
      val imageSize = height * width * nChannels
      // 用于存储解码后的像素矩阵
      val decoded = Array.ofDim[Byte](imageSize)
      if (isGray) {
        // 处理单通道图像
        ...
      } else {
        // 处理多通道图像
        var offset = 0
        for (h <- 0 until height) {for (w <- 0 until width) {val color = new Color(img.getRGB(w, h), hasAlpha)
            // 解码后的通道顺序为 BGR(A)
            decoded(offset) = color.getBlue.toByte
            decoded(offset + 1) = color.getGreen.toByte
            decoded(offset + 2) = color.getRed.toByte
            if (hasAlpha) {decoded(offset + 3) = color.getAlpha.toByte
            }
            offset += nChannels
          }
        }
      }
      // 转换为一行数据
      Some(Row(Row(origin, height, width, nChannels, mode, decoded)))
    }
  }

从上可以看出:

  • 本数据源在实现上使用 javax 的 ImageIO 库实现各类格式的图像文件的解码.ImageIO 虽然是一个十分强大和专业的 java 图像处理库, 但是和更专业的 CV 库 (如 OpenCV) 比起来, 性能上和功能上差距还是非常大的;
  • 解码后的图像通道顺序和像素数值类型是固定的, 顺序固定为 BGR(A), 像素数值类型为 8U;
  • 最多支持 4 个通道, 因此像多光谱遥感图像这类可能包含数十个波段信息的图像就无法支持了;
  • 解码后输出的信息仅包含基本的长宽、通道数和模式等字段, 如果需要获取更为详细元数据, 如 exif,GPS 坐标等就爱莫能助了;
  • 数据源在生成 DataFrame 时执行了图像的解码操作, 并且解码后的数据存储在 Java 堆内内存中. 这在实际项目应该是一个比较粗放的实现方式, 会占用大量的资源, 包括内存和带宽(如果发生 shuffle 的话, 可以考虑参考同一个图像文件保存为 BMP 和 JPG 的大小差别).

编码和存储

从上分析可以看出, 当前图像数据源并不支持对处理后的像素矩阵进行编码并保存为指定格式的图像文件.

图像处理能力

当前版本 Apache Spark 并没有提供面向图像数据的 UDF, 图像数据的处理需要借助 ImageIO 库或其他更专业的 CV 库.

小结

当前 Apache Spark 的内置图像数据源可以较为方便的加载图像文件进行分析. 不过, 当前的实现还十分简陋, 性能和资源消耗应该都不会太乐观. 并且, 当前版本仅提供了图像数据的加载能力, 并没有提供常用处理算法的封装和实现, 也不能很好的支持更为专业的 CV 垂直领域的分析业务. 当然, 这和图像数据源在 Spark 中的定位有关(将图像数据作为输入用于训练 DL 模型, 这类任务对图像的处理本身要求并不多). 如果希望使用 Spark 框架完成更实际的图像处理任务, 还有很多工作要做, 比如:

  • 支持更加丰富的元数据模型
  • 使用更专业的编解码库和更灵活编解码流程控制
  • 封装面向 CV 的算子和函数
  • 更高效的内存管理
  • 支持 GPU

等等诸如此类的工作, 限于篇幅, 这里就不展开了.
好了, 再多说一句, 现在 Spark 已经支持处理图像数据了(虽然支持有限), 那么, 视频流数据还会远吗?


本文作者:开源大数据 EMR

阅读原文

本文为云栖社区原创内容,未经允许不得转载。

退出移动版