Preserving compression quantization on rewrite

I’d like to use Astropy to read one FITS file with some tile-compressed image HDU and write a new one that preserves the original tile-compressed image without quantizing again (but does modify the header, add some other new HDUs, and rewrite some other HDUs more substantially).

I was hoping just reading a CompImageHDU from one file and then appending to a different HDUList object that was opened with mode=’append’ on the other file would do this, but it looks more like it’s decompressing the image data and then compressing it again (which results in changes, I think because ZZERO and ZSCALE values inferred from the never-quantized data are different from those inferred from the quantized-one data).

Is there some other way to extract the original compressed data and pass it around in memory to be used again on write, or (even better, because it’s cheaper) preserve the ZZERO and ZSCALE values so they can be used again the next time?

Did you try with disable_image_compression=True ? With this option you can get the compressed data as a BinTableHDUso when appending it would directly copy the compressed tiles.

Yes, that’s what I ended up going with. Unfortunately it means I had to read the file twice, because I couldn’t figure out how to go from the disable_image_compress=True BinTableHDU to a CompImageHDU (the constructor for the latter seems like it ought to work, but it didn’t for reasons I don’t quite recall and can’t really go back and check now).

It should be doable with CompImageHDU(bintable=hdu):

That doesn’t work for me on its own; I saw that code in the Astropy source, too, but it looks like something is different between the state of the BinTableHDU there and what I get from a indexing the HDUList returned by astropy.io.fits.open(filename, disable_image_compression=True).

Here’s the traceback I get:

../../../install/conda/envs/lsst-scipipe-12.1.0/lib/python3.13/site-packages/astropy/io/fits/hdu/compressed/compressed.py:494: in data
    data = self.section[...]
           ^^^^^^^^^^^^^^^^^
../../../install/conda/envs/lsst-scipipe-12.1.0/lib/python3.13/site-packages/astropy/io/fits/hdu/compressed/section.py:61: in __getitem__
    data = decompress_image_data_section(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

compressed_data = FITS_rec([([128, 0, 0, ..., 65, 84, 176], 0.95107269, 2.04241298e+09),
          ([128, 0, 0, ..., 153, 212, 106], 0.9...04241299e+09)],
         dtype=(numpy.record, [('COMPRESSED_DATA', '>i4', (2,)), ('ZSCALE', '>f8'), ('ZZERO', '>f8')]))
compression_type = 'RICE_1'
compressed_header = XTENSION= 'BINTABLE'           / binary table extension                         
BITPIX  =                    8 / 8-bi...                                      
UZSCALE =    0.951072692871094                                                  
bintable = <astropy.io.fits.hdu.table.BinTableHDU object at 0x7cecebbd3cb0>
image_header = XTENSION= 'IMAGE   '                                                            
BITPIX  =                  -32 / data...                                      
UZSCALE =    0.951072692871094                                                  
first_tile_index = array([0, 0]), last_tile_index = array([4, 3])

    def decompress_image_data_section(
        compressed_data,
        compression_type,
        compressed_header,
        bintable,
        image_header,
        first_tile_index,
        last_tile_index,
    ):
        """
        Decompress the data in a `~astropy.io.fits.CompImageHDU`.
    
        Parameters
        ----------
        compressed_data : `~astropy.io.fits.FITS_rec`
            The compressed data
        compression_type : str
            The compression algorithm
        compressed_header : `~astropy.io.fits.Header`
            The header of the compressed binary table
        bintable : `~astropy.io.fits.BinTableHDU`
            The binary table HDU, used to access the raw heap data
        first_tile_index : iterable
            The indices of the first tile to decompress along each dimension
        last_tile_index : iterable
            The indices of the last tile to decompress along each dimension
    
        Returns
        -------
        data : `numpy.ndarray`
            The decompressed data array.
        """
        compressed_coldefs = compressed_data._coldefs
    
        _check_compressed_header(compressed_header)
    
        tile_shape = _tile_shape(compressed_header)
        data_shape = _data_shape(compressed_header)
    
        first_array_index = first_tile_index * tile_shape
        last_array_index = (last_tile_index + 1) * tile_shape
    
        last_array_index = np.minimum(data_shape, last_array_index)
    
        buffer_shape = tuple((last_array_index - first_array_index).astype(int))
    
        image_data = np.empty(
            buffer_shape, dtype=BITPIX2DTYPE[compressed_header["ZBITPIX"]]
        )
    
        quantized = "ZSCALE" in compressed_data.dtype.names
    
        if image_data.size == 0:
            return image_data
    
        settings = _header_to_settings(compressed_header)
        zbitpix = compressed_header["ZBITPIX"]
        dither_method = DITHER_METHODS[compressed_header.get("ZQUANTIZ", "NO_DITHER")]
        dither_seed = compressed_header.get("ZDITHER0", 0)
    
        # NOTE: in the following and below we convert the column to a Numpy array
        # for performance reasons, as accessing rows from a FITS_rec column is
        # otherwise slow.
        compressed_data_column = np.array(compressed_data["COMPRESSED_DATA"])
        compressed_data_dtype = _column_dtype(compressed_coldefs, "COMPRESSED_DATA")
    
        if "ZBLANK" in compressed_coldefs.dtype.names:
            zblank_column = np.array(compressed_data["ZBLANK"])
        else:
            zblank_column = None
    
        if "ZSCALE" in compressed_coldefs.dtype.names:
            zscale_column = np.array(compressed_data["ZSCALE"])
        else:
            zscale_column = None
    
        if "ZZERO" in compressed_coldefs.dtype.names:
            zzero_column = np.array(compressed_data["ZZERO"])
        else:
            zzero_column = None
    
        zblank_header = compressed_header.get("ZBLANK", image_header.get("BLANK", None))
    
        gzip_compressed_data_column = None
        gzip_compressed_data_dtype = None
    
        # If all the data is requested, read in all the heap.
        if tuple(buffer_shape) == tuple(data_shape):
            heap_cache = bintable._get_raw_data(
                compressed_header["PCOUNT"],
                np.uint8,
                bintable._data_offset + bintable._theap,
            )
        else:
            heap_cache = None
    
        for row_index, tile_slices in _iter_array_tiles(
            data_shape, tile_shape, first_tile_index, last_tile_index
        ):
            # For tiles near the edge, the tile shape from the header might not be
            # correct so we have to pass the shape manually.
            actual_tile_shape = image_data[tile_slices].shape
    
            settings = _update_tile_settings(settings, compression_type, actual_tile_shape)
    
            if compressed_data_column[row_index][0] == 0:
                if gzip_compressed_data_column is None:
                    gzip_compressed_data_column = np.array(
                        compressed_data["GZIP_COMPRESSED_DATA"]
                    )
                    gzip_compressed_data_dtype = _column_dtype(
                        compressed_coldefs, "GZIP_COMPRESSED_DATA"
                    )
    
                # When quantizing floating point data, sometimes the data will not
                # quantize efficiently. In these cases the raw floating point data can
                # be losslessly GZIP compressed and stored in the `GZIP_COMPRESSED_DATA`
                # column.
    
                cdata = _get_data_from_heap(
                    bintable,
                    *gzip_compressed_data_column[row_index],
                    gzip_compressed_data_dtype,
                    heap_cache=heap_cache,
                )
    
                tile_buffer = _decompress_tile(cdata, algorithm="GZIP_1")
    
                tile_data = _finalize_array(
                    tile_buffer,
                    bitpix=zbitpix,
                    tile_shape=actual_tile_shape,
                    algorithm="GZIP_1",
                    lossless=True,
                )
    
            else:
>               cdata = _get_data_from_heap(
                    bintable,
                    *compressed_data_column[row_index],
                    compressed_data_dtype,
                    heap_cache=heap_cache,
                )
E               TypeError: _get_data_from_heap() got multiple values for argument 'heap_cache'

(edited because the first traceback I posted was due to user error). This one is from doing:

with astropy.io.fits.open(filename, disable_image_compression=True) as hdu_list:
    image_hdu: ExtensionHDU = hdu_list[1]
    if isinstance(image_hdu, astropy.io.fits.BinTableHDU):            
        image_hdu = astropy.io.fits.CompImageHDU(bintable=image_hdu)
    image_hdu.data

It works with the example files from astropy so maybe something specific to the compression parameters in your file. If you can share an example file, I would be happy to have a look.