API for file upload. #116

This commit is contained in:
Andrey Smirnov
2014-10-16 00:04:50 +04:00
parent eef44f5cd5
commit 9c834f410c
4 changed files with 192 additions and 5 deletions
+70 -5
View File
@@ -9,16 +9,24 @@ import (
"strings"
)
func verifyDir(c *gin.Context) bool {
dir := c.Params.ByName("dir")
dir = filepath.Clean(dir)
for _, part := range strings.Split(dir, string(filepath.Separator)) {
func verifyPath(path string) bool {
path = filepath.Clean(path)
for _, part := range strings.Split(path, string(filepath.Separator)) {
if part == ".." || part == "." {
c.Fail(400, fmt.Errorf("wrong dir"))
return false
}
}
return true
}
func verifyDir(c *gin.Context) bool {
if !verifyPath(c.Params.ByName("dir")) {
c.Fail(400, fmt.Errorf("wrong dir"))
return false
}
return true
}
@@ -106,15 +114,72 @@ func apiFilesUpload(c *gin.Context) {
// GET /files/:dir
func apiFilesListFiles(c *gin.Context) {
if !verifyDir(c) {
return
}
list := []string{}
root := filepath.Join(context.UploadPath(), c.Params.ByName("dir"))
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if path == root {
return nil
}
list = append(list, filepath.Base(path))
return nil
})
if err != nil {
if os.IsNotExist(err) {
c.Fail(404, err)
} else {
c.Fail(500, err)
}
return
}
c.JSON(200, list)
}
// DELETE /files/:dir
func apiFilesDeleteDir(c *gin.Context) {
if !verifyDir(c) {
return
}
err := os.RemoveAll(filepath.Join(context.UploadPath(), c.Params.ByName("dir")))
if err != nil {
c.Fail(500, err)
return
}
c.JSON(200, gin.H{})
}
// DELETE /files/:dir/:name
func apiFilesDeleteFile(c *gin.Context) {
if !verifyDir(c) {
return
}
if !verifyPath(c.Params.ByName("name")) {
c.Fail(400, fmt.Errorf("wrong file"))
return
}
err := os.Remove(filepath.Join(context.UploadPath(), c.Params.ByName("dir"), c.Params.ByName("name")))
if err != nil {
if err, ok := err.(*os.PathError); !ok || !os.IsNotExist(err.Err) {
c.Fail(500, err)
return
}
}
c.JSON(200, gin.H{})
}
+23
View File
@@ -3,6 +3,9 @@ import time
import json
import random
import string
import os
import inspect
import shutil
try:
import requests
@@ -27,6 +30,9 @@ class APITest(BaseTest):
APITest.aptly_server = self._start_process("aptly api serve -listen=%s" % (self.base_url),)
time.sleep(1)
if os.path.exists(os.path.join(os.environ["HOME"], ".aptly", "upload")):
shutil.rmtree(os.path.join(os.environ["HOME"], ".aptly", "upload"))
def run(self):
pass
@@ -44,6 +50,23 @@ class APITest(BaseTest):
def delete(self, uri, *args, **kwargs):
return requests.delete("http://%s%s" % (self.base_url, uri), *args, **kwargs)
def upload(self, uri, *filenames, **kwargs):
upload_name = kwargs.pop("upload_name", None)
directory = kwargs.pop("directory", "files")
assert kwargs == {}
files = {}
for filename in filenames:
fp = open(os.path.join(os.path.dirname(inspect.getsourcefile(BaseTest)), directory, filename), "rb")
if upload_name is not None:
upload_filename = upload_name
else:
upload_filename = filename
files[upload_filename] = (upload_filename, fp)
return self.post(uri, files=files)
@classmethod
def shutdown_class(cls):
if cls.aptly_server is not None:
+1
View File
@@ -3,3 +3,4 @@ Testing aptly REST API
"""
from .repos import *
from .files import *
+98
View File
@@ -0,0 +1,98 @@
from api_lib import APITest
class FilesAPITestUpload(APITest):
"""
POST /files/:dir
"""
def check(self):
d = self.random_name()
resp = self.upload("/api/files/" + d, "pyspi_0.6.1-1.3.dsc")
self.check_equal(resp.status_code, 200)
self.check_equal(resp.json(), [d + '/pyspi_0.6.1-1.3.dsc'])
self.check_exists("upload/" + d + '/pyspi_0.6.1-1.3.dsc')
class FilesAPITestUploadMulti(APITest):
"""
POST /files/:dir, GET /files/:dir multi files
"""
def check(self):
d = self.random_name()
self.check_equal(self.get("/api/files/" + d).status_code, 404)
resp = self.upload("/api/files/" + d, "pyspi_0.6.1-1.3.dsc", "pyspi_0.6.1-1.3.diff.gz", "pyspi_0.6.1.orig.tar.gz")
self.check_equal(resp.status_code, 200)
self.check_equal(sorted(resp.json()),
[d + '/pyspi_0.6.1-1.3.diff.gz', d + '/pyspi_0.6.1-1.3.dsc', d + '/pyspi_0.6.1.orig.tar.gz'])
self.check_exists("upload/" + d + '/pyspi_0.6.1-1.3.dsc')
self.check_exists("upload/" + d + '/pyspi_0.6.1-1.3.diff.gz')
self.check_exists("upload/" + d + '/pyspi_0.6.1.orig.tar.gz')
resp = self.get("/api/files/" + d)
self.check_equal(resp.status_code, 200)
self.check_equal(sorted(resp.json()),
['pyspi_0.6.1-1.3.diff.gz', 'pyspi_0.6.1-1.3.dsc', 'pyspi_0.6.1.orig.tar.gz'])
class FilesAPITestList(APITest):
"""
GET /files/
"""
def check(self):
d1, d2, d3 = self.random_name(), self.random_name(), self.random_name()
resp = self.get("/api/files")
self.check_equal(resp.status_code, 200)
self.check_equal(resp.json(), [])
self.check_equal(self.upload("/api/files/" + d1, "pyspi_0.6.1-1.3.dsc").status_code, 200)
self.check_equal(self.upload("/api/files/" + d2, "pyspi_0.6.1-1.3.dsc").status_code, 200)
self.check_equal(self.upload("/api/files/" + d3, "pyspi_0.6.1-1.3.dsc").status_code, 200)
resp = self.get("/api/files")
self.check_equal(resp.status_code, 200)
self.check_equal(sorted(resp.json()), sorted([d1, d2, d3]))
class FilesAPITestDelete(APITest):
"""
DELETE /files/:dir, DELETE /files/:dir/:name
"""
def check(self):
d1, d2 = self.random_name(), self.random_name()
self.check_equal(self.get("/api/files").json(), [])
self.check_equal(self.delete("/api/files/" + d1).status_code, 200)
self.check_equal(self.delete("/api/files/" + d1 + "/" + "pyspi_0.6.1-1.3.dsc").status_code, 200)
self.check_equal(self.upload("/api/files/" + d1, "pyspi_0.6.1-1.3.dsc").status_code, 200)
self.check_equal(self.upload("/api/files/" + d2, "pyspi_0.6.1-1.3.dsc").status_code, 200)
self.check_equal(self.delete("/api/files/" + d1).status_code, 200)
self.check_equal(self.get("/api/files").json(), [d2])
self.check_equal(self.delete("/api/files/" + d2 + "/" + "no-such-file").status_code, 200)
self.check_equal(self.get("/api/files/" + d2).json(), ["pyspi_0.6.1-1.3.dsc"])
self.check_equal(self.delete("/api/files/" + d2 + "/" + "pyspi_0.6.1-1.3.dsc").status_code, 200)
self.check_equal(self.get("/api/files").json(), [d2])
self.check_equal(self.get("/api/files/" + d2).json(), [])
class FilesAPITestSecurity(APITest):
"""
delete & upload security
"""
def check(self):
self.check_equal(self.delete("/api/files/.").status_code, 400)
self.check_equal(self.delete("/api/files").status_code, 404)
self.check_equal(self.delete("/api/files/../.").status_code, 400)
self.check_equal(self.delete("/api/files/./..").status_code, 400)
self.check_equal(self.delete("/api/files/dir/..").status_code, 400)