重构离线镜像下载
This commit is contained in:
577
src/imagetar.go
Normal file
577
src/imagetar.go
Normal file
@@ -0,0 +1,577 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"archive/tar"
|
||||
"compress/gzip"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/google/go-containerregistry/pkg/authn"
|
||||
"github.com/google/go-containerregistry/pkg/name"
|
||||
"github.com/google/go-containerregistry/pkg/v1"
|
||||
"github.com/google/go-containerregistry/pkg/v1/remote"
|
||||
"github.com/google/go-containerregistry/pkg/v1/types"
|
||||
)
|
||||
|
||||
// ImageStreamer 镜像流式下载器
|
||||
type ImageStreamer struct {
|
||||
concurrency int
|
||||
remoteOptions []remote.Option
|
||||
}
|
||||
|
||||
// ImageStreamerConfig 下载器配置
|
||||
type ImageStreamerConfig struct {
|
||||
Concurrency int
|
||||
}
|
||||
|
||||
// NewImageStreamer 创建镜像下载器
|
||||
func NewImageStreamer(config *ImageStreamerConfig) *ImageStreamer {
|
||||
if config == nil {
|
||||
config = &ImageStreamerConfig{}
|
||||
}
|
||||
|
||||
concurrency := config.Concurrency
|
||||
if concurrency <= 0 {
|
||||
cfg := GetConfig()
|
||||
concurrency = cfg.Download.MaxImages
|
||||
if concurrency <= 0 {
|
||||
concurrency = 10
|
||||
}
|
||||
}
|
||||
|
||||
remoteOptions := []remote.Option{
|
||||
remote.WithAuth(authn.Anonymous),
|
||||
remote.WithTransport(GetGlobalHTTPClient().Transport),
|
||||
}
|
||||
|
||||
return &ImageStreamer{
|
||||
concurrency: concurrency,
|
||||
remoteOptions: remoteOptions,
|
||||
}
|
||||
}
|
||||
|
||||
// StreamOptions 下载选项
|
||||
type StreamOptions struct {
|
||||
Platform string
|
||||
Compression bool
|
||||
}
|
||||
|
||||
// StreamImageToWriter 流式下载镜像到Writer
|
||||
func (is *ImageStreamer) StreamImageToWriter(ctx context.Context, imageRef string, writer io.Writer, options *StreamOptions) error {
|
||||
if options == nil {
|
||||
options = &StreamOptions{}
|
||||
}
|
||||
|
||||
ref, err := name.ParseReference(imageRef)
|
||||
if err != nil {
|
||||
return fmt.Errorf("解析镜像引用失败: %w", err)
|
||||
}
|
||||
|
||||
log.Printf("开始下载镜像: %s", ref.String())
|
||||
|
||||
contextOptions := append(is.remoteOptions, remote.WithContext(ctx))
|
||||
|
||||
desc, err := is.getImageDescriptor(ref, contextOptions)
|
||||
if err != nil {
|
||||
return fmt.Errorf("获取镜像描述失败: %w", err)
|
||||
}
|
||||
switch desc.MediaType {
|
||||
case types.OCIImageIndex, types.DockerManifestList:
|
||||
return is.streamMultiArchImage(ctx, desc, writer, options, contextOptions)
|
||||
case types.OCIManifestSchema1, types.DockerManifestSchema2:
|
||||
return is.streamSingleImage(ctx, desc, writer, options, contextOptions)
|
||||
default:
|
||||
return is.streamSingleImage(ctx, desc, writer, options, contextOptions)
|
||||
}
|
||||
}
|
||||
|
||||
// getImageDescriptor 获取镜像描述符
|
||||
func (is *ImageStreamer) getImageDescriptor(ref name.Reference, options []remote.Option) (*remote.Descriptor, error) {
|
||||
if isCacheEnabled() {
|
||||
var reference string
|
||||
if tagged, ok := ref.(name.Tag); ok {
|
||||
reference = tagged.TagStr()
|
||||
} else if digested, ok := ref.(name.Digest); ok {
|
||||
reference = digested.DigestStr()
|
||||
}
|
||||
|
||||
if reference != "" {
|
||||
cacheKey := buildManifestCacheKey(ref.Context().String(), reference)
|
||||
if cachedItem := globalCache.Get(cacheKey); cachedItem != nil {
|
||||
desc := &remote.Descriptor{
|
||||
Manifest: cachedItem.Data,
|
||||
MediaType: types.MediaType(cachedItem.ContentType),
|
||||
}
|
||||
log.Printf("使用缓存的manifest: %s", ref.String())
|
||||
return desc, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
desc, err := remote.Get(ref, options...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if isCacheEnabled() {
|
||||
var reference string
|
||||
if tagged, ok := ref.(name.Tag); ok {
|
||||
reference = tagged.TagStr()
|
||||
} else if digested, ok := ref.(name.Digest); ok {
|
||||
reference = digested.DigestStr()
|
||||
}
|
||||
|
||||
if reference != "" {
|
||||
cacheKey := buildManifestCacheKey(ref.Context().String(), reference)
|
||||
ttl := getManifestTTL(reference)
|
||||
headers := map[string]string{
|
||||
"Docker-Content-Digest": desc.Digest.String(),
|
||||
}
|
||||
globalCache.Set(cacheKey, desc.Manifest, string(desc.MediaType), headers, ttl)
|
||||
log.Printf("缓存manifest: %s (TTL: %v)", ref.String(), ttl)
|
||||
}
|
||||
}
|
||||
|
||||
return desc, nil
|
||||
}
|
||||
|
||||
// StreamImageToGin 流式响应到Gin
|
||||
func (is *ImageStreamer) StreamImageToGin(ctx context.Context, imageRef string, c *gin.Context, options *StreamOptions) error {
|
||||
if options == nil {
|
||||
options = &StreamOptions{}
|
||||
}
|
||||
|
||||
filename := strings.ReplaceAll(imageRef, "/", "_") + ".docker"
|
||||
c.Header("Content-Type", "application/octet-stream")
|
||||
c.Header("Content-Disposition", fmt.Sprintf("attachment; filename=\"%s\"", filename))
|
||||
|
||||
if options.Compression {
|
||||
c.Header("Content-Encoding", "gzip")
|
||||
}
|
||||
|
||||
return is.StreamImageToWriter(ctx, imageRef, c.Writer, options)
|
||||
}
|
||||
|
||||
// streamMultiArchImage 处理多架构镜像
|
||||
func (is *ImageStreamer) streamMultiArchImage(ctx context.Context, desc *remote.Descriptor, writer io.Writer, options *StreamOptions, remoteOptions []remote.Option) error {
|
||||
index, err := desc.ImageIndex()
|
||||
if err != nil {
|
||||
return fmt.Errorf("获取镜像索引失败: %w", err)
|
||||
}
|
||||
|
||||
manifest, err := index.IndexManifest()
|
||||
if err != nil {
|
||||
return fmt.Errorf("获取索引清单失败: %w", err)
|
||||
}
|
||||
|
||||
// 选择合适的平台
|
||||
var selectedDesc *v1.Descriptor
|
||||
for _, m := range manifest.Manifests {
|
||||
if m.Platform == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if options.Platform != "" {
|
||||
platformParts := strings.Split(options.Platform, "/")
|
||||
if len(platformParts) == 2 &&
|
||||
m.Platform.OS == platformParts[0] &&
|
||||
m.Platform.Architecture == platformParts[1] {
|
||||
selectedDesc = &m
|
||||
break
|
||||
}
|
||||
} else if m.Platform.OS == "linux" && m.Platform.Architecture == "amd64" {
|
||||
selectedDesc = &m
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if selectedDesc == nil && len(manifest.Manifests) > 0 {
|
||||
selectedDesc = &manifest.Manifests[0]
|
||||
}
|
||||
|
||||
if selectedDesc == nil {
|
||||
return fmt.Errorf("未找到合适的平台镜像")
|
||||
}
|
||||
|
||||
img, err := index.Image(selectedDesc.Digest)
|
||||
if err != nil {
|
||||
return fmt.Errorf("获取选中镜像失败: %w", err)
|
||||
}
|
||||
|
||||
return is.streamImageLayers(ctx, img, writer, options)
|
||||
}
|
||||
|
||||
// streamSingleImage 处理单架构镜像
|
||||
func (is *ImageStreamer) streamSingleImage(ctx context.Context, desc *remote.Descriptor, writer io.Writer, options *StreamOptions, remoteOptions []remote.Option) error {
|
||||
img, err := desc.Image()
|
||||
if err != nil {
|
||||
return fmt.Errorf("获取镜像失败: %w", err)
|
||||
}
|
||||
|
||||
return is.streamImageLayers(ctx, img, writer, options)
|
||||
}
|
||||
|
||||
// streamImageLayers 处理镜像层
|
||||
func (is *ImageStreamer) streamImageLayers(ctx context.Context, img v1.Image, writer io.Writer, options *StreamOptions) error {
|
||||
var finalWriter io.Writer = writer
|
||||
|
||||
if options.Compression {
|
||||
gzWriter := gzip.NewWriter(writer)
|
||||
defer gzWriter.Close()
|
||||
finalWriter = gzWriter
|
||||
}
|
||||
|
||||
tarWriter := tar.NewWriter(finalWriter)
|
||||
defer tarWriter.Close()
|
||||
|
||||
configFile, err := img.ConfigFile()
|
||||
if err != nil {
|
||||
return fmt.Errorf("获取镜像配置失败: %w", err)
|
||||
}
|
||||
|
||||
layers, err := img.Layers()
|
||||
if err != nil {
|
||||
return fmt.Errorf("获取镜像层失败: %w", err)
|
||||
}
|
||||
|
||||
log.Printf("镜像包含 %d 层", len(layers))
|
||||
|
||||
return is.streamDockerFormat(ctx, tarWriter, img, layers, configFile)
|
||||
}
|
||||
|
||||
// streamDockerFormat 生成Docker格式
|
||||
func (is *ImageStreamer) streamDockerFormat(ctx context.Context, tarWriter *tar.Writer, img v1.Image, layers []v1.Layer, configFile *v1.ConfigFile) error {
|
||||
configDigest, err := img.ConfigName()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
configData, err := json.Marshal(configFile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
configHeader := &tar.Header{
|
||||
Name: configDigest.String() + ".json",
|
||||
Size: int64(len(configData)),
|
||||
Mode: 0644,
|
||||
}
|
||||
|
||||
if err := tarWriter.WriteHeader(configHeader); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := tarWriter.Write(configData); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
layerDigests := make([]string, len(layers))
|
||||
for i, layer := range layers {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
digest, err := layer.Digest()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
layerDigests[i] = digest.String()
|
||||
|
||||
layerDir := digest.String()
|
||||
layerHeader := &tar.Header{
|
||||
Name: layerDir + "/",
|
||||
Typeflag: tar.TypeDir,
|
||||
Mode: 0755,
|
||||
}
|
||||
|
||||
if err := tarWriter.WriteHeader(layerHeader); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
layerReader, err := layer.Uncompressed()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer layerReader.Close()
|
||||
|
||||
size, err := layer.Size()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
layerTarHeader := &tar.Header{
|
||||
Name: layerDir + "/layer.tar",
|
||||
Size: size,
|
||||
Mode: 0644,
|
||||
}
|
||||
|
||||
if err := tarWriter.WriteHeader(layerTarHeader); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := io.Copy(tarWriter, layerReader); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Printf("已处理层 %d/%d", i+1, len(layers))
|
||||
}
|
||||
|
||||
|
||||
manifest := []map[string]interface{}{{
|
||||
"Config": configDigest.String() + ".json",
|
||||
"RepoTags": []string{"imported:latest"},
|
||||
"Layers": func() []string {
|
||||
var layers []string
|
||||
for _, digest := range layerDigests {
|
||||
layers = append(layers, digest+"/layer.tar")
|
||||
}
|
||||
return layers
|
||||
}(),
|
||||
}}
|
||||
|
||||
manifestData, err := json.Marshal(manifest)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
manifestHeader := &tar.Header{
|
||||
Name: "manifest.json",
|
||||
Size: int64(len(manifestData)),
|
||||
Mode: 0644,
|
||||
}
|
||||
|
||||
if err := tarWriter.WriteHeader(manifestHeader); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = tarWriter.Write(manifestData)
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
|
||||
var globalImageStreamer *ImageStreamer
|
||||
|
||||
// initImageStreamer 初始化镜像下载器
|
||||
func initImageStreamer() {
|
||||
globalImageStreamer = NewImageStreamer(nil)
|
||||
log.Printf("镜像下载器初始化完成,并发数: %d,缓存: %v",
|
||||
globalImageStreamer.concurrency, isCacheEnabled())
|
||||
}
|
||||
|
||||
// formatPlatformText 格式化平台文本
|
||||
func formatPlatformText(platform string) string {
|
||||
if platform == "" {
|
||||
return "自动选择"
|
||||
}
|
||||
return platform
|
||||
}
|
||||
|
||||
// initImageTarRoutes 初始化镜像下载路由
|
||||
func initImageTarRoutes(router *gin.Engine) {
|
||||
imageAPI := router.Group("/api/image")
|
||||
{
|
||||
imageAPI.GET("/download/:image", RateLimitMiddleware(globalLimiter), handleDirectImageDownload)
|
||||
imageAPI.GET("/info/:image", RateLimitMiddleware(globalLimiter), handleImageInfo)
|
||||
imageAPI.POST("/batch", RateLimitMiddleware(globalLimiter), handleSimpleBatchDownload)
|
||||
}
|
||||
}
|
||||
|
||||
// handleDirectImageDownload 处理单镜像下载
|
||||
func handleDirectImageDownload(c *gin.Context) {
|
||||
imageParam := c.Param("image")
|
||||
if imageParam == "" {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "缺少镜像参数"})
|
||||
return
|
||||
}
|
||||
|
||||
imageRef := strings.ReplaceAll(imageParam, "_", "/")
|
||||
platform := c.Query("platform")
|
||||
tag := c.DefaultQuery("tag")
|
||||
|
||||
if tag != "" && !strings.Contains(imageRef, ":") && !strings.Contains(imageRef, "@") {
|
||||
imageRef = imageRef + ":" + tag
|
||||
} else if !strings.Contains(imageRef, ":") && !strings.Contains(imageRef, "@") {
|
||||
imageRef = imageRef + ":latest"
|
||||
}
|
||||
|
||||
if _, err := name.ParseReference(imageRef); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "镜像引用格式错误: " + err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
options := &StreamOptions{
|
||||
Platform: platform,
|
||||
Compression: false,
|
||||
}
|
||||
|
||||
ctx := c.Request.Context()
|
||||
log.Printf("下载镜像: %s (平台: %s)", imageRef, formatPlatformText(platform))
|
||||
|
||||
if err := globalImageStreamer.StreamImageToGin(ctx, imageRef, c, options); err != nil {
|
||||
log.Printf("镜像下载失败: %v", err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "镜像下载失败: " + err.Error()})
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// handleSimpleBatchDownload 处理批量下载
|
||||
func handleSimpleBatchDownload(c *gin.Context) {
|
||||
var req struct {
|
||||
Images []string `json:"images" binding:"required"`
|
||||
Platform string `json:"platform"`
|
||||
}
|
||||
|
||||
if err := c.ShouldBindJSON(&req); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "请求参数错误: " + err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
if len(req.Images) == 0 {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "镜像列表不能为空"})
|
||||
return
|
||||
}
|
||||
|
||||
cfg := GetConfig()
|
||||
if len(req.Images) > cfg.Download.MaxImages {
|
||||
c.JSON(http.StatusBadRequest, gin.H{
|
||||
"error": fmt.Sprintf("镜像数量超过限制,最大允许: %d", cfg.Download.MaxImages),
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
options := &StreamOptions{
|
||||
Platform: req.Platform,
|
||||
Compression: true,
|
||||
}
|
||||
|
||||
ctx := c.Request.Context()
|
||||
log.Printf("批量下载 %d 个镜像 (平台: %s)", len(req.Images), formatPlatformText(req.Platform))
|
||||
|
||||
filename := fmt.Sprintf("batch_%d_images.docker.gz", len(req.Images))
|
||||
|
||||
c.Header("Content-Type", "application/octet-stream")
|
||||
c.Header("Content-Disposition", fmt.Sprintf("attachment; filename=\"%s\"", filename))
|
||||
c.Header("Content-Encoding", "gzip")
|
||||
|
||||
if err := globalImageStreamer.StreamMultipleImages(ctx, req.Images, c.Writer, options); err != nil {
|
||||
log.Printf("批量镜像下载失败: %v", err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "批量镜像下载失败: " + err.Error()})
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// handleImageInfo 处理镜像信息查询
|
||||
func handleImageInfo(c *gin.Context) {
|
||||
imageParam := c.Param("image")
|
||||
if imageParam == "" {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "缺少镜像参数"})
|
||||
return
|
||||
}
|
||||
|
||||
imageRef := strings.ReplaceAll(imageParam, "_", "/")
|
||||
tag := c.DefaultQuery("tag", "latest")
|
||||
|
||||
if !strings.Contains(imageRef, ":") && !strings.Contains(imageRef, "@") {
|
||||
imageRef = imageRef + ":" + tag
|
||||
}
|
||||
|
||||
ref, err := name.ParseReference(imageRef)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "镜像引用格式错误: " + err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
ctx := c.Request.Context()
|
||||
contextOptions := append(globalImageStreamer.remoteOptions, remote.WithContext(ctx))
|
||||
|
||||
desc, err := globalImageStreamer.getImageDescriptor(ref, contextOptions)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "获取镜像信息失败: " + err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
info := gin.H{
|
||||
"name": ref.String(),
|
||||
"mediaType": desc.MediaType,
|
||||
"digest": desc.Digest.String(),
|
||||
"size": desc.Size,
|
||||
}
|
||||
|
||||
if desc.MediaType == types.OCIImageIndex || desc.MediaType == types.DockerManifestList {
|
||||
index, err := desc.ImageIndex()
|
||||
if err == nil {
|
||||
manifest, err := index.IndexManifest()
|
||||
if err == nil {
|
||||
var platforms []string
|
||||
for _, m := range manifest.Manifests {
|
||||
if m.Platform != nil {
|
||||
platforms = append(platforms, m.Platform.OS+"/"+m.Platform.Architecture)
|
||||
}
|
||||
}
|
||||
info["platforms"] = platforms
|
||||
info["multiArch"] = true
|
||||
}
|
||||
}
|
||||
} else {
|
||||
info["multiArch"] = false
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{"success": true, "data": info})
|
||||
}
|
||||
|
||||
// StreamMultipleImages 批量下载多个镜像
|
||||
func (is *ImageStreamer) StreamMultipleImages(ctx context.Context, imageRefs []string, writer io.Writer, options *StreamOptions) error {
|
||||
if options == nil {
|
||||
options = &StreamOptions{}
|
||||
}
|
||||
|
||||
var finalWriter io.Writer = writer
|
||||
if options.Compression {
|
||||
gzWriter := gzip.NewWriter(writer)
|
||||
defer gzWriter.Close()
|
||||
finalWriter = gzWriter
|
||||
}
|
||||
|
||||
tarWriter := tar.NewWriter(finalWriter)
|
||||
defer tarWriter.Close()
|
||||
|
||||
for i, imageRef := range imageRefs {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
log.Printf("处理镜像 %d/%d: %s", i+1, len(imageRefs), imageRef)
|
||||
|
||||
dirName := fmt.Sprintf("image_%d_%s/", i, strings.ReplaceAll(imageRef, "/", "_"))
|
||||
|
||||
dirHeader := &tar.Header{
|
||||
Name: dirName,
|
||||
Typeflag: tar.TypeDir,
|
||||
Mode: 0755,
|
||||
}
|
||||
|
||||
if err := tarWriter.WriteHeader(dirHeader); err != nil {
|
||||
return fmt.Errorf("创建镜像目录失败: %w", err)
|
||||
}
|
||||
|
||||
if err := is.StreamImageToWriter(ctx, imageRef, tarWriter, &StreamOptions{
|
||||
Platform: options.Platform,
|
||||
Compression: false,
|
||||
}); err != nil {
|
||||
log.Printf("下载镜像 %s 失败: %v", imageRef, err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user